123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- import datetime
- import logging
- import os
- import queue
- import threading
- import time
- from pathlib import Path
- from peewee import fn
- from frigate.config import EventsConfig, FrigateConfig, RecordConfig
- from frigate.const import CLIPS_DIR
- from frigate.models import Event
- logger = logging.getLogger(__name__)
- class EventProcessor(threading.Thread):
- def __init__(
- self, config, camera_processes, event_queue, event_processed_queue, stop_event
- ):
- threading.Thread.__init__(self)
- self.name = "event_processor"
- self.config = config
- self.camera_processes = camera_processes
- self.cached_clips = {}
- self.event_queue = event_queue
- self.event_processed_queue = event_processed_queue
- self.events_in_process = {}
- self.stop_event = stop_event
- def run(self):
- while not self.stop_event.is_set():
- try:
- event_type, camera, event_data = self.event_queue.get(timeout=10)
- except queue.Empty:
- continue
- logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
- if event_type == "start":
- self.events_in_process[event_data["id"]] = event_data
- if event_type == "end":
- event_config: EventsConfig = self.config.cameras[camera].record.events
- if event_data["has_clip"] or event_data["has_snapshot"]:
- Event.create(
- id=event_data["id"],
- label=event_data["label"],
- camera=camera,
- start_time=event_data["start_time"] - event_config.pre_capture,
- end_time=event_data["end_time"] + event_config.post_capture,
- top_score=event_data["top_score"],
- false_positive=event_data["false_positive"],
- zones=list(event_data["entered_zones"]),
- thumbnail=event_data["thumbnail"],
- region=event_data["region"],
- box=event_data["box"],
- area=event_data["area"],
- has_clip=event_data["has_clip"],
- has_snapshot=event_data["has_snapshot"],
- )
- del self.events_in_process[event_data["id"]]
- self.event_processed_queue.put((event_data["id"], camera))
- logger.info(f"Exiting event processor...")
- class EventCleanup(threading.Thread):
- def __init__(self, config: FrigateConfig, stop_event):
- threading.Thread.__init__(self)
- self.name = "event_cleanup"
- self.config = config
- self.stop_event = stop_event
- self.camera_keys = list(self.config.cameras.keys())
- def expire(self, media_type):
- ## Expire events from unlisted cameras based on the global config
- if media_type == "clips":
- retain_config = self.config.record.events.retain
- file_extension = "mp4"
- update_params = {"has_clip": False}
- else:
- retain_config = self.config.snapshots.retain
- file_extension = "jpg"
- update_params = {"has_snapshot": False}
- distinct_labels = (
- Event.select(Event.label)
- .where(Event.camera.not_in(self.camera_keys))
- .distinct()
- )
- # loop over object types in db
- for l in distinct_labels:
- # get expiration time for this label
- expire_days = retain_config.objects.get(l.label, retain_config.default)
- expire_after = (
- datetime.datetime.now() - datetime.timedelta(days=expire_days)
- ).timestamp()
- # grab all events after specific time
- expired_events = Event.select().where(
- Event.camera.not_in(self.camera_keys),
- Event.start_time < expire_after,
- Event.label == l.label,
- )
- # delete the media from disk
- for event in expired_events:
- media_name = f"{event.camera}-{event.id}"
- media_path = Path(
- f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
- )
- media_path.unlink(missing_ok=True)
- if file_extension == "jpg":
- media_path = Path(
- f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
- )
- media_path.unlink(missing_ok=True)
- # update the clips attribute for the db entry
- update_query = Event.update(update_params).where(
- Event.camera.not_in(self.camera_keys),
- Event.start_time < expire_after,
- Event.label == l.label,
- )
- update_query.execute()
- ## Expire events from cameras based on the camera config
- for name, camera in self.config.cameras.items():
- if media_type == "clips":
- retain_config = camera.record.events.retain
- else:
- retain_config = camera.snapshots.retain
- # get distinct objects in database for this camera
- distinct_labels = (
- Event.select(Event.label).where(Event.camera == name).distinct()
- )
- # loop over object types in db
- for l in distinct_labels:
- # get expiration time for this label
- expire_days = retain_config.objects.get(l.label, retain_config.default)
- expire_after = (
- datetime.datetime.now() - datetime.timedelta(days=expire_days)
- ).timestamp()
- # grab all events after specific time
- expired_events = Event.select().where(
- Event.camera == name,
- Event.start_time < expire_after,
- Event.label == l.label,
- )
- # delete the grabbed clips from disk
- for event in expired_events:
- media_name = f"{event.camera}-{event.id}"
- media_path = Path(
- f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
- )
- media_path.unlink(missing_ok=True)
- if file_extension == "jpg":
- media_path = Path(
- f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
- )
- media_path.unlink(missing_ok=True)
- # update the clips attribute for the db entry
- update_query = Event.update(update_params).where(
- Event.camera == name,
- Event.start_time < expire_after,
- Event.label == l.label,
- )
- update_query.execute()
- def purge_duplicates(self):
- duplicate_query = """with grouped_events as (
- select id,
- label,
- camera,
- has_snapshot,
- has_clip,
- row_number() over (
- partition by label, camera, round(start_time/5,0)*5
- order by end_time-start_time desc
- ) as copy_number
- from event
- )
- select distinct id, camera, has_snapshot, has_clip from grouped_events
- where copy_number > 1;"""
- duplicate_events = Event.raw(duplicate_query)
- for event in duplicate_events:
- logger.debug(f"Removing duplicate: {event.id}")
- media_name = f"{event.camera}-{event.id}"
- media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
- media_path.unlink(missing_ok=True)
- media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}-clean.png")
- media_path.unlink(missing_ok=True)
- media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4")
- media_path.unlink(missing_ok=True)
- (
- Event.delete()
- .where(Event.id << [event.id for event in duplicate_events])
- .execute()
- )
- def run(self):
- # only expire events every 5 minutes
- while not self.stop_event.wait(300):
- self.expire("clips")
- self.expire("snapshots")
- self.purge_duplicates()
- # drop events from db where has_clip and has_snapshot are false
- delete_query = Event.delete().where(
- Event.has_clip == False, Event.has_snapshot == False
- )
- delete_query.execute()
- logger.info(f"Exiting event cleanup...")
|