events.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. import datetime
  2. import logging
  3. import os
  4. import queue
  5. import threading
  6. import time
  7. from pathlib import Path
  8. from peewee import fn
  9. from frigate.config import EventsConfig, FrigateConfig, RecordConfig
  10. from frigate.const import CLIPS_DIR
  11. from frigate.models import Event
  12. logger = logging.getLogger(__name__)
  13. def should_update_db(prev_event, current_event):
  14. return (
  15. prev_event["top_score"] != current_event["top_score"]
  16. or prev_event["entered_zones"] != current_event["entered_zones"]
  17. or prev_event["thumbnail"] != current_event["thumbnail"]
  18. or prev_event["has_clip"] != current_event["has_clip"]
  19. or prev_event["has_snapshot"] != current_event["has_snapshot"]
  20. )
  21. class EventProcessor(threading.Thread):
  22. def __init__(
  23. self, config, camera_processes, event_queue, event_processed_queue, stop_event
  24. ):
  25. threading.Thread.__init__(self)
  26. self.name = "event_processor"
  27. self.config = config
  28. self.camera_processes = camera_processes
  29. self.cached_clips = {}
  30. self.event_queue = event_queue
  31. self.event_processed_queue = event_processed_queue
  32. self.events_in_process = {}
  33. self.stop_event = stop_event
  34. def run(self):
  35. # set an end_time on events without an end_time on startup
  36. Event.update(end_time=Event.start_time + 30).where(
  37. Event.end_time == None
  38. ).execute()
  39. while not self.stop_event.is_set():
  40. try:
  41. event_type, camera, event_data = self.event_queue.get(timeout=10)
  42. except queue.Empty:
  43. continue
  44. logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
  45. event_config: EventsConfig = self.config.cameras[camera].record.events
  46. if event_type == "start":
  47. self.events_in_process[event_data["id"]] = event_data
  48. elif event_type == "update" and should_update_db(
  49. self.events_in_process[event_data["id"]], event_data
  50. ):
  51. self.events_in_process[event_data["id"]] = event_data
  52. # TODO: this will generate a lot of db activity possibly
  53. if event_data["has_clip"] or event_data["has_snapshot"]:
  54. Event.replace(
  55. id=event_data["id"],
  56. label=event_data["label"],
  57. camera=camera,
  58. start_time=event_data["start_time"] - event_config.pre_capture,
  59. end_time=None,
  60. top_score=event_data["top_score"],
  61. false_positive=event_data["false_positive"],
  62. zones=list(event_data["entered_zones"]),
  63. thumbnail=event_data["thumbnail"],
  64. region=event_data["region"],
  65. box=event_data["box"],
  66. area=event_data["area"],
  67. has_clip=event_data["has_clip"],
  68. has_snapshot=event_data["has_snapshot"],
  69. ).execute()
  70. elif event_type == "end":
  71. if event_data["has_clip"] or event_data["has_snapshot"]:
  72. Event.replace(
  73. id=event_data["id"],
  74. label=event_data["label"],
  75. camera=camera,
  76. start_time=event_data["start_time"] - event_config.pre_capture,
  77. end_time=event_data["end_time"] + event_config.post_capture,
  78. top_score=event_data["top_score"],
  79. false_positive=event_data["false_positive"],
  80. zones=list(event_data["entered_zones"]),
  81. thumbnail=event_data["thumbnail"],
  82. region=event_data["region"],
  83. box=event_data["box"],
  84. area=event_data["area"],
  85. has_clip=event_data["has_clip"],
  86. has_snapshot=event_data["has_snapshot"],
  87. ).execute()
  88. del self.events_in_process[event_data["id"]]
  89. self.event_processed_queue.put((event_data["id"], camera))
  90. # set an end_time on events without an end_time before exiting
  91. Event.update(end_time=datetime.datetime.now().timestamp()).where(
  92. Event.end_time == None
  93. ).execute()
  94. logger.info(f"Exiting event processor...")
  95. class EventCleanup(threading.Thread):
  96. def __init__(self, config: FrigateConfig, stop_event):
  97. threading.Thread.__init__(self)
  98. self.name = "event_cleanup"
  99. self.config = config
  100. self.stop_event = stop_event
  101. self.camera_keys = list(self.config.cameras.keys())
  102. def expire(self, media_type):
  103. ## Expire events from unlisted cameras based on the global config
  104. if media_type == "clips":
  105. retain_config = self.config.record.events.retain
  106. file_extension = "mp4"
  107. update_params = {"has_clip": False}
  108. else:
  109. retain_config = self.config.snapshots.retain
  110. file_extension = "jpg"
  111. update_params = {"has_snapshot": False}
  112. distinct_labels = (
  113. Event.select(Event.label)
  114. .where(Event.camera.not_in(self.camera_keys))
  115. .distinct()
  116. )
  117. # loop over object types in db
  118. for l in distinct_labels:
  119. # get expiration time for this label
  120. expire_days = retain_config.objects.get(l.label, retain_config.default)
  121. expire_after = (
  122. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  123. ).timestamp()
  124. # grab all events after specific time
  125. expired_events = Event.select().where(
  126. Event.camera.not_in(self.camera_keys),
  127. Event.start_time < expire_after,
  128. Event.label == l.label,
  129. )
  130. # delete the media from disk
  131. for event in expired_events:
  132. media_name = f"{event.camera}-{event.id}"
  133. media_path = Path(
  134. f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
  135. )
  136. media_path.unlink(missing_ok=True)
  137. if file_extension == "jpg":
  138. media_path = Path(
  139. f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
  140. )
  141. media_path.unlink(missing_ok=True)
  142. # update the clips attribute for the db entry
  143. update_query = Event.update(update_params).where(
  144. Event.camera.not_in(self.camera_keys),
  145. Event.start_time < expire_after,
  146. Event.label == l.label,
  147. )
  148. update_query.execute()
  149. ## Expire events from cameras based on the camera config
  150. for name, camera in self.config.cameras.items():
  151. if media_type == "clips":
  152. retain_config = camera.record.events.retain
  153. else:
  154. retain_config = camera.snapshots.retain
  155. # get distinct objects in database for this camera
  156. distinct_labels = (
  157. Event.select(Event.label).where(Event.camera == name).distinct()
  158. )
  159. # loop over object types in db
  160. for l in distinct_labels:
  161. # get expiration time for this label
  162. expire_days = retain_config.objects.get(l.label, retain_config.default)
  163. expire_after = (
  164. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  165. ).timestamp()
  166. # grab all events after specific time
  167. expired_events = Event.select().where(
  168. Event.camera == name,
  169. Event.start_time < expire_after,
  170. Event.label == l.label,
  171. )
  172. # delete the grabbed clips from disk
  173. for event in expired_events:
  174. media_name = f"{event.camera}-{event.id}"
  175. media_path = Path(
  176. f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
  177. )
  178. media_path.unlink(missing_ok=True)
  179. if file_extension == "jpg":
  180. media_path = Path(
  181. f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
  182. )
  183. media_path.unlink(missing_ok=True)
  184. # update the clips attribute for the db entry
  185. update_query = Event.update(update_params).where(
  186. Event.camera == name,
  187. Event.start_time < expire_after,
  188. Event.label == l.label,
  189. )
  190. update_query.execute()
  191. def purge_duplicates(self):
  192. duplicate_query = """with grouped_events as (
  193. select id,
  194. label,
  195. camera,
  196. has_snapshot,
  197. has_clip,
  198. row_number() over (
  199. partition by label, camera, round(start_time/5,0)*5
  200. order by end_time-start_time desc
  201. ) as copy_number
  202. from event
  203. )
  204. select distinct id, camera, has_snapshot, has_clip from grouped_events
  205. where copy_number > 1;"""
  206. duplicate_events = Event.raw(duplicate_query)
  207. for event in duplicate_events:
  208. logger.debug(f"Removing duplicate: {event.id}")
  209. media_name = f"{event.camera}-{event.id}"
  210. media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
  211. media_path.unlink(missing_ok=True)
  212. media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}-clean.png")
  213. media_path.unlink(missing_ok=True)
  214. media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4")
  215. media_path.unlink(missing_ok=True)
  216. (
  217. Event.delete()
  218. .where(Event.id << [event.id for event in duplicate_events])
  219. .execute()
  220. )
  221. def run(self):
  222. # only expire events every 5 minutes
  223. while not self.stop_event.wait(300):
  224. self.expire("clips")
  225. self.expire("snapshots")
  226. self.purge_duplicates()
  227. # drop events from db where has_clip and has_snapshot are false
  228. delete_query = Event.delete().where(
  229. Event.has_clip == False, Event.has_snapshot == False
  230. )
  231. delete_query.execute()
  232. logger.info(f"Exiting event cleanup...")