123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582 |
- import base64
- import json
- import os
- from typing import Dict
- import cv2
- import matplotlib.pyplot as plt
- import numpy as np
- import voluptuous as vol
- import yaml
- DETECTORS_SCHEMA = vol.Schema(
- {
- vol.Required(str): {
- vol.Required('type', default='edgetpu'): vol.In(['cpu', 'edgetpu']),
- vol.Optional('device', default='usb'): str
- }
- }
- )
- DEFAULT_DETECTORS = {
- 'coral': {
- 'type': 'edgetpu',
- 'device': 'usb'
- }
- }
- MQTT_SCHEMA = vol.Schema(
- {
- vol.Required('host'): str,
- vol.Optional('port', default=1883): int,
- vol.Optional('topic_prefix', default='frigate'): str,
- vol.Optional('client_id', default='frigate'): str,
- 'user': str,
- 'password': str
- }
- )
- SAVE_CLIPS_SCHEMA = vol.Schema(
- {
- vol.Optional('max_seconds', default=300): int,
- vol.Optional('clips_dir', default='/media/frigate/clips'): str,
- vol.Optional('cache_dir', default='/tmp/cache'): str
- }
- )
- FFMPEG_GLOBAL_ARGS_DEFAULT = ['-hide_banner','-loglevel','panic']
- FFMPEG_INPUT_ARGS_DEFAULT = ['-avoid_negative_ts', 'make_zero',
- '-fflags', 'nobuffer',
- '-flags', 'low_delay',
- '-strict', 'experimental',
- '-fflags', '+genpts+discardcorrupt',
- '-rtsp_transport', 'tcp',
- '-stimeout', '5000000',
- '-use_wallclock_as_timestamps', '1']
- FFMPEG_OUTPUT_ARGS_DEFAULT = ['-f', 'rawvideo',
- '-pix_fmt', 'yuv420p']
- GLOBAL_FFMPEG_SCHEMA = vol.Schema(
- {
- vol.Optional('global_args', default=FFMPEG_GLOBAL_ARGS_DEFAULT): [str],
- vol.Optional('hwaccel_args', default=[]): [str],
- vol.Optional('input_args', default=FFMPEG_INPUT_ARGS_DEFAULT): [str],
- vol.Optional('output_args', default=FFMPEG_OUTPUT_ARGS_DEFAULT): [str]
- }
- )
- FILTER_SCHEMA = vol.Schema(
- {
- str: {
- vol.Optional('min_area', default=0): int,
- vol.Optional('max_area', default=24000000): int,
- vol.Optional('threshold', default=0.85): float
- }
- }
- )
- def filters_for_all_tracked_objects(object_config):
- for tracked_object in object_config.get('track', ['person']):
- if not 'filters' in object_config:
- object_config['filters'] = {}
- if not tracked_object in object_config['filters']:
- object_config['filters'][tracked_object] = {}
- return object_config
- OBJECTS_SCHEMA = vol.Schema(vol.All(filters_for_all_tracked_objects,
- {
- vol.Optional('track', default=['person']): [str],
- # TODO: this should populate filters for all tracked objects
- vol.Optional('filters', default = {}): FILTER_SCHEMA.extend({ str: {vol.Optional('min_score', default=0.5): float}})
- }
- ))
- DEFAULT_CAMERA_SAVE_CLIPS = {
- 'enabled': False
- }
- DEFAULT_CAMERA_SNAPSHOTS = {
- 'show_timestamp': True,
- 'draw_zones': False,
- 'draw_bounding_boxes': True,
- 'crop_to_region': True
- }
- CAMERA_FFMPEG_SCHEMA = vol.Schema(
- {
- vol.Required('input'): str,
- 'global_args': [str],
- 'hwaccel_args': [str],
- 'input_args': [str],
- 'output_args': [str]
- }
- )
- CAMERAS_SCHEMA = vol.Schema(
- {
- str: {
- vol.Required('ffmpeg'): CAMERA_FFMPEG_SCHEMA,
- vol.Required('height'): int,
- vol.Required('width'): int,
- 'fps': int,
- 'mask': str,
- vol.Optional('best_image_timeout', default=60): int,
- vol.Optional('zones', default={}): {
- str: {
- vol.Required('coordinates'): vol.Any(str, [str]),
- vol.Optional('filters', default={}): FILTER_SCHEMA
- }
- },
- vol.Optional('save_clips', default=DEFAULT_CAMERA_SAVE_CLIPS): {
- vol.Optional('enabled', default=False): bool,
- vol.Optional('pre_capture', default=30): int,
- 'objects': [str],
- },
- vol.Optional('snapshots', default=DEFAULT_CAMERA_SNAPSHOTS): {
- vol.Optional('show_timestamp', default=True): bool,
- vol.Optional('draw_zones', default=False): bool,
- vol.Optional('draw_bounding_boxes', default=True): bool,
- vol.Optional('crop_to_region', default=True): bool,
- 'height': int
- },
- 'objects': OBJECTS_SCHEMA
- }
- }
- )
- FRIGATE_CONFIG_SCHEMA = vol.Schema(
- {
- vol.Optional('detectors', default=DEFAULT_DETECTORS): DETECTORS_SCHEMA,
- 'mqtt': MQTT_SCHEMA,
- vol.Optional('save_clips', default={}): SAVE_CLIPS_SCHEMA,
- vol.Optional('ffmpeg', default={}): GLOBAL_FFMPEG_SCHEMA,
- vol.Optional('objects', default={}): OBJECTS_SCHEMA,
- vol.Required('cameras', default={}): CAMERAS_SCHEMA
- }
- )
- class DetectorConfig():
- def __init__(self, config):
- self._type = config['type']
- self._device = config['device']
-
- @property
- def type(self):
- return self._type
-
- @property
- def device(self):
- return self._device
- class MqttConfig():
- def __init__(self, config):
- self._host = config['host']
- self._port = config['port']
- self._topic_prefix = config['topic_prefix']
- self._client_id = config['client_id']
- self._user = config.get('user')
- self._password = config.get('password')
-
- @property
- def host(self):
- return self._host
-
- @property
- def port(self):
- return self._port
-
- @property
- def topic_prefix(self):
- return self._topic_prefix
-
- @property
- def client_id(self):
- return self._client_id
-
- @property
- def user(self):
- return self._user
-
- @property
- def password(self):
- return self._password
- class SaveClipsConfig():
- def __init__(self, config):
- self._max_seconds = config['max_seconds']
- self._clips_dir = config['clips_dir']
- self._cache_dir = config['cache_dir']
-
- @property
- def max_seconds(self):
- return self._max_seconds
-
- @property
- def clips_dir(self):
- return self._clips_dir
-
- @property
- def cache_dir(self):
- return self._cache_dir
- class FfmpegConfig():
- def __init__(self, global_config, config):
- self._input = config.get('input')
- self._global_args = config.get('global_args', global_config['global_args'])
- self._hwaccel_args = config.get('hwaccel_args', global_config['hwaccel_args'])
- self._input_args = config.get('input_args', global_config['input_args'])
- self._output_args = config.get('output_args', global_config['output_args'])
-
- @property
- def input(self):
- return self._input
-
- @property
- def global_args(self):
- return self._global_args
-
- @property
- def hwaccel_args(self):
- return self._hwaccel_args
- @property
- def input_args(self):
- return self._input_args
-
- @property
- def output_args(self):
- return self._output_args
- class FilterConfig():
- def __init__(self, config):
- self._min_area = config['min_area']
- self._max_area = config['max_area']
- self._threshold = config['threshold']
- self._min_score = config.get('min_score')
-
- @property
- def min_area(self):
- return self._min_area
- @property
- def max_area(self):
- return self._max_area
- @property
- def threshold(self):
- return self._threshold
-
- @property
- def min_score(self):
- return self._min_score
- class ObjectConfig():
- def __init__(self, global_config, config):
- self._track = config.get('track', global_config['track'])
- if 'filters' in config:
- self._filters = { name: FilterConfig(c) for name, c in config['filters'].items() }
- else:
- self._filters = { name: FilterConfig(c) for name, c in global_config['filters'].items() }
-
- @property
- def track(self):
- return self._track
-
- @property
- def filters(self) -> Dict[str, FilterConfig]:
- return self._filters
- class CameraSnapshotsConfig():
- def __init__(self, config):
- self._show_timestamp = config['show_timestamp']
- self._draw_zones = config['draw_zones']
- self._draw_bounding_boxes = config['draw_bounding_boxes']
- self._crop_to_region = config['crop_to_region']
- self._height = config.get('height')
-
- @property
- def show_timestamp(self):
- return self._show_timestamp
-
- @property
- def draw_zones(self):
- return self._draw_zones
- @property
- def draw_bounding_boxes(self):
- return self._draw_bounding_boxes
- @property
- def crop_to_region(self):
- return self._crop_to_region
- @property
- def height(self):
- return self._height
- class CameraSaveClipsConfig():
- def __init__(self, config):
- self._enabled = config['enabled']
- self._pre_capture = config['pre_capture']
- self._objects = config.get('objects')
-
- @property
- def enabled(self):
- return self._enabled
-
- @property
- def pre_capture(self):
- return self._pre_capture
- @property
- def objects(self):
- return self._objects
- class ZoneConfig():
- def __init__(self, name, config):
- self._coordinates = config['coordinates']
- self._filters = { name: FilterConfig(c) for name, c in config['filters'].items() }
- if isinstance(self._coordinates, list):
- self._contour = np.array([[int(p.split(',')[0]), int(p.split(',')[1])] for p in self._coordinates])
- elif isinstance(self._coordinates, str):
- points = self._coordinates.split(',')
- self._contour = np.array([[int(points[i]), int(points[i+1])] for i in range(0, len(points), 2)])
- else:
- print(f"Unable to parse zone coordinates for {name}")
- self._contour = np.array([])
-
- self._color = (0,0,0)
-
- @property
- def coordinates(self):
- return self._coordinates
-
- @property
- def contour(self):
- return self._contour
-
- @contour.setter
- def contour(self, val):
- self._contour = val
-
- @property
- def color(self):
- return self._color
-
- @color.setter
- def color(self, val):
- self._color = val
-
- @property
- def filters(self):
- return self._filters
- class CameraConfig():
- def __init__(self, name, config, cache_dir, global_ffmpeg, global_objects):
- self._name = name
- self._ffmpeg = FfmpegConfig(global_ffmpeg, config['ffmpeg'])
- self._height = config.get('height')
- self._width = config.get('width')
- self._frame_shape = (self._height, self._width)
- self._frame_shape_yuv = (self._frame_shape[0]*3//2, self._frame_shape[1])
- self._fps = config.get('fps')
- self._mask = self._create_mask(config.get('mask'))
- self._best_image_timeout = config['best_image_timeout']
- self._zones = { name: ZoneConfig(name, z) for name, z in config['zones'].items() }
- self._save_clips = CameraSaveClipsConfig(config['save_clips'])
- self._snapshots = CameraSnapshotsConfig(config['snapshots'])
- self._objects = ObjectConfig(global_objects, config.get('objects', {}))
- self._ffmpeg_cmd = self._get_ffmpeg_cmd(cache_dir)
- self._set_zone_colors(self._zones)
- def _create_mask(self, mask):
- if mask:
- if mask.startswith('base64,'):
- img = base64.b64decode(mask[7:])
- np_img = np.fromstring(img, dtype=np.uint8)
- mask_img = cv2.imdecode(np_img, cv2.IMREAD_GRAYSCALE)
- elif mask.startswith('poly,'):
- points = mask.split(',')[1:]
- contour = np.array([[int(points[i]), int(points[i+1])] for i in range(0, len(points), 2)])
- mask_img = np.zeros(self.frame_shape, np.uint8)
- mask_img[:] = 255
- cv2.fillPoly(mask_img, pts=[contour], color=(0))
- else:
- mask_img = cv2.imread(f"/config/{mask}", cv2.IMREAD_GRAYSCALE)
- else:
- mask_img = None
- if mask_img is None or mask_img.size == 0:
- mask_img = np.zeros(self.frame_shape, np.uint8)
- mask_img[:] = 255
-
- return mask_img
- def _get_ffmpeg_cmd(self, cache_dir):
- ffmpeg_output_args = self.ffmpeg.output_args
- if self.fps:
- ffmpeg_output_args = ["-r", str(self.fps)] + ffmpeg_output_args
- if self.save_clips.enabled:
- ffmpeg_output_args = [
- "-f",
- "segment",
- "-segment_time",
- "10",
- "-segment_format",
- "mp4",
- "-reset_timestamps",
- "1",
- "-strftime",
- "1",
- "-c",
- "copy",
- "-an",
- f"{os.path.join(cache_dir, self.name)}-%Y%m%d%H%M%S.mp4"
- ] + ffmpeg_output_args
- return (['ffmpeg'] +
- self.ffmpeg.global_args +
- self.ffmpeg.hwaccel_args +
- self.ffmpeg.input_args +
- ['-i', self.ffmpeg.input] +
- ffmpeg_output_args +
- ['pipe:'])
-
- def _set_zone_colors(self, zones: Dict[str, ZoneConfig]):
- # set colors for zones
- all_zone_names = zones.keys()
- zone_colors = {}
- colors = plt.cm.get_cmap('tab10', len(all_zone_names))
- for i, zone in enumerate(all_zone_names):
- zone_colors[zone] = tuple(int(round(255 * c)) for c in colors(i)[:3])
-
- for name, zone in zones.items():
- zone.color = zone_colors[name]
-
- @property
- def name(self):
- return self._name
- @property
- def ffmpeg(self):
- return self._ffmpeg
-
- @property
- def height(self):
- return self._height
-
- @property
- def width(self):
- return self._width
-
- @property
- def fps(self):
- return self._fps
-
- @property
- def mask(self):
- return self._mask
-
- @property
- def best_image_timeout(self):
- return self._best_image_timeout
-
- @property
- def mqtt(self):
- return self._mqtt
-
- @property
- def zones(self)-> Dict[str, ZoneConfig]:
- return self._zones
-
- @property
- def save_clips(self):
- return self._save_clips
-
- @property
- def snapshots(self):
- return self._snapshots
-
- @property
- def objects(self):
- return self._objects
- @property
- def frame_shape(self):
- return self._frame_shape
- @property
- def frame_shape_yuv(self):
- return self._frame_shape_yuv
- @property
- def ffmpeg_cmd(self):
- return self._ffmpeg_cmd
- class FrigateConfig():
- def __init__(self, config_file=None, config=None):
- if config is None and config_file is None:
- raise ValueError('config or config_file must be defined')
- elif not config_file is None:
- config = self._load_file(config_file)
-
- config = FRIGATE_CONFIG_SCHEMA(config)
- config = self._sub_env_vars(config)
- self._web_port = config['web_port']
- self._detectors = { name: DetectorConfig(d) for name, d in config['detectors'].items() }
- self._mqtt = MqttConfig(config['mqtt'])
- self._save_clips = SaveClipsConfig(config['save_clips'])
- self._cameras = { name: CameraConfig(name, c, self._save_clips.cache_dir, config['ffmpeg'], config['objects']) for name, c in config['cameras'].items() }
- self._ensure_dirs()
- def _sub_env_vars(self, config):
- frigate_env_vars = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
- if 'password' in config['mqtt']:
- config['mqtt']['password'] = config['mqtt']['password'].format(**frigate_env_vars)
-
- for camera in config['cameras'].values():
- camera['ffmpeg']['input'] = camera['ffmpeg']['input'].format(**frigate_env_vars)
-
- return config
-
- def _ensure_dirs(self):
- cache_dir = self.save_clips.cache_dir
- clips_dir = self.save_clips.clips_dir
- if not os.path.exists(cache_dir) and not os.path.islink(cache_dir):
- os.makedirs(cache_dir)
- if not os.path.exists(clips_dir) and not os.path.islink(clips_dir):
- os.makedirs(clips_dir)
- def _load_file(self, config_file):
- with open(config_file) as f:
- raw_config = f.read()
-
- if config_file.endswith(".yml"):
- config = yaml.safe_load(raw_config)
- elif config_file.endswith(".json"):
- config = json.loads(raw_config)
-
- return config
-
- @property
- def detectors(self) -> Dict[str, DetectorConfig]:
- return self._detectors
-
- @property
- def mqtt(self):
- return self._mqtt
-
- @property
- def save_clips(self):
- return self._save_clips
- @property
- def cameras(self) -> Dict[str, CameraConfig]:
- return self._cameras
|