events.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import datetime
  2. import logging
  3. import os
  4. import queue
  5. import threading
  6. import time
  7. from pathlib import Path
  8. from peewee import fn
  9. from frigate.config import EventsConfig, FrigateConfig, RecordConfig
  10. from frigate.const import CLIPS_DIR
  11. from frigate.models import Event
  12. logger = logging.getLogger(__name__)
  13. class EventProcessor(threading.Thread):
  14. def __init__(
  15. self, config, camera_processes, event_queue, event_processed_queue, stop_event
  16. ):
  17. threading.Thread.__init__(self)
  18. self.name = "event_processor"
  19. self.config = config
  20. self.camera_processes = camera_processes
  21. self.cached_clips = {}
  22. self.event_queue = event_queue
  23. self.event_processed_queue = event_processed_queue
  24. self.events_in_process = {}
  25. self.stop_event = stop_event
  26. def run(self):
  27. # set an end_time on events without an end_time on startup
  28. Event.update(end_time=Event.start_time + 30).where(
  29. Event.end_time == None
  30. ).execute()
  31. while not self.stop_event.is_set():
  32. try:
  33. event_type, camera, event_data = self.event_queue.get(timeout=10)
  34. except queue.Empty:
  35. continue
  36. logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
  37. event_config: EventsConfig = self.config.cameras[camera].record.events
  38. if event_type == "start":
  39. self.events_in_process[event_data["id"]] = event_data
  40. elif event_type == "update":
  41. self.events_in_process[event_data["id"]] = event_data
  42. # TODO: this will generate a lot of db activity possibly
  43. if event_data["has_clip"] or event_data["has_snapshot"]:
  44. Event.replace(
  45. id=event_data["id"],
  46. label=event_data["label"],
  47. camera=camera,
  48. start_time=event_data["start_time"] - event_config.pre_capture,
  49. end_time=None,
  50. top_score=event_data["top_score"],
  51. false_positive=event_data["false_positive"],
  52. zones=list(event_data["entered_zones"]),
  53. thumbnail=event_data["thumbnail"],
  54. region=event_data["region"],
  55. box=event_data["box"],
  56. area=event_data["area"],
  57. has_clip=event_data["has_clip"],
  58. has_snapshot=event_data["has_snapshot"],
  59. ).execute()
  60. elif event_type == "end":
  61. if event_data["has_clip"] or event_data["has_snapshot"]:
  62. Event.replace(
  63. id=event_data["id"],
  64. label=event_data["label"],
  65. camera=camera,
  66. start_time=event_data["start_time"] - event_config.pre_capture,
  67. end_time=event_data["end_time"] + event_config.post_capture,
  68. top_score=event_data["top_score"],
  69. false_positive=event_data["false_positive"],
  70. zones=list(event_data["entered_zones"]),
  71. thumbnail=event_data["thumbnail"],
  72. region=event_data["region"],
  73. box=event_data["box"],
  74. area=event_data["area"],
  75. has_clip=event_data["has_clip"],
  76. has_snapshot=event_data["has_snapshot"],
  77. ).execute()
  78. del self.events_in_process[event_data["id"]]
  79. self.event_processed_queue.put((event_data["id"], camera))
  80. # set an end_time on events without an end_time before exiting
  81. Event.update(end_time=datetime.datetime.now().timestamp()).where(
  82. Event.end_time == None
  83. ).execute()
  84. logger.info(f"Exiting event processor...")
  85. class EventCleanup(threading.Thread):
  86. def __init__(self, config: FrigateConfig, stop_event):
  87. threading.Thread.__init__(self)
  88. self.name = "event_cleanup"
  89. self.config = config
  90. self.stop_event = stop_event
  91. self.camera_keys = list(self.config.cameras.keys())
  92. def expire(self, media_type):
  93. ## Expire events from unlisted cameras based on the global config
  94. if media_type == "clips":
  95. retain_config = self.config.record.events.retain
  96. file_extension = "mp4"
  97. update_params = {"has_clip": False}
  98. else:
  99. retain_config = self.config.snapshots.retain
  100. file_extension = "jpg"
  101. update_params = {"has_snapshot": False}
  102. distinct_labels = (
  103. Event.select(Event.label)
  104. .where(Event.camera.not_in(self.camera_keys))
  105. .distinct()
  106. )
  107. # loop over object types in db
  108. for l in distinct_labels:
  109. # get expiration time for this label
  110. expire_days = retain_config.objects.get(l.label, retain_config.default)
  111. expire_after = (
  112. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  113. ).timestamp()
  114. # grab all events after specific time
  115. expired_events = Event.select().where(
  116. Event.camera.not_in(self.camera_keys),
  117. Event.start_time < expire_after,
  118. Event.label == l.label,
  119. )
  120. # delete the media from disk
  121. for event in expired_events:
  122. media_name = f"{event.camera}-{event.id}"
  123. media_path = Path(
  124. f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
  125. )
  126. media_path.unlink(missing_ok=True)
  127. if file_extension == "jpg":
  128. media_path = Path(
  129. f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
  130. )
  131. media_path.unlink(missing_ok=True)
  132. # update the clips attribute for the db entry
  133. update_query = Event.update(update_params).where(
  134. Event.camera.not_in(self.camera_keys),
  135. Event.start_time < expire_after,
  136. Event.label == l.label,
  137. )
  138. update_query.execute()
  139. ## Expire events from cameras based on the camera config
  140. for name, camera in self.config.cameras.items():
  141. if media_type == "clips":
  142. retain_config = camera.record.events.retain
  143. else:
  144. retain_config = camera.snapshots.retain
  145. # get distinct objects in database for this camera
  146. distinct_labels = (
  147. Event.select(Event.label).where(Event.camera == name).distinct()
  148. )
  149. # loop over object types in db
  150. for l in distinct_labels:
  151. # get expiration time for this label
  152. expire_days = retain_config.objects.get(l.label, retain_config.default)
  153. expire_after = (
  154. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  155. ).timestamp()
  156. # grab all events after specific time
  157. expired_events = Event.select().where(
  158. Event.camera == name,
  159. Event.start_time < expire_after,
  160. Event.label == l.label,
  161. )
  162. # delete the grabbed clips from disk
  163. for event in expired_events:
  164. media_name = f"{event.camera}-{event.id}"
  165. media_path = Path(
  166. f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
  167. )
  168. media_path.unlink(missing_ok=True)
  169. if file_extension == "jpg":
  170. media_path = Path(
  171. f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
  172. )
  173. media_path.unlink(missing_ok=True)
  174. # update the clips attribute for the db entry
  175. update_query = Event.update(update_params).where(
  176. Event.camera == name,
  177. Event.start_time < expire_after,
  178. Event.label == l.label,
  179. )
  180. update_query.execute()
  181. def purge_duplicates(self):
  182. duplicate_query = """with grouped_events as (
  183. select id,
  184. label,
  185. camera,
  186. has_snapshot,
  187. has_clip,
  188. row_number() over (
  189. partition by label, camera, round(start_time/5,0)*5
  190. order by end_time-start_time desc
  191. ) as copy_number
  192. from event
  193. )
  194. select distinct id, camera, has_snapshot, has_clip from grouped_events
  195. where copy_number > 1;"""
  196. duplicate_events = Event.raw(duplicate_query)
  197. for event in duplicate_events:
  198. logger.debug(f"Removing duplicate: {event.id}")
  199. media_name = f"{event.camera}-{event.id}"
  200. media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
  201. media_path.unlink(missing_ok=True)
  202. media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}-clean.png")
  203. media_path.unlink(missing_ok=True)
  204. media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4")
  205. media_path.unlink(missing_ok=True)
  206. (
  207. Event.delete()
  208. .where(Event.id << [event.id for event in duplicate_events])
  209. .execute()
  210. )
  211. def run(self):
  212. # only expire events every 5 minutes
  213. while not self.stop_event.wait(300):
  214. self.expire("clips")
  215. self.expire("snapshots")
  216. self.purge_duplicates()
  217. # drop events from db where has_clip and has_snapshot are false
  218. delete_query = Event.delete().where(
  219. Event.has_clip == False, Event.has_snapshot == False
  220. )
  221. delete_query.execute()
  222. logger.info(f"Exiting event cleanup...")