app.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. import json
  2. import logging
  3. import multiprocessing as mp
  4. import os
  5. import signal
  6. import sys
  7. import threading
  8. from logging.handlers import QueueHandler
  9. from typing import Dict, List
  10. import traceback
  11. import yaml
  12. from peewee_migrate import Router
  13. from playhouse.sqlite_ext import SqliteExtDatabase
  14. from playhouse.sqliteq import SqliteQueueDatabase
  15. from pydantic import ValidationError
  16. from frigate.config import DetectorTypeEnum, FrigateConfig
  17. from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
  18. from frigate.edgetpu import EdgeTPUProcess
  19. from frigate.events import EventCleanup, EventProcessor
  20. from frigate.http import create_app
  21. from frigate.log import log_process, root_configurer
  22. from frigate.models import Event, Recordings
  23. from frigate.mqtt import MqttSocketRelay, create_mqtt_client
  24. from frigate.object_processing import TrackedObjectProcessor
  25. from frigate.output import output_frames
  26. from frigate.record import RecordingCleanup, RecordingMaintainer
  27. from frigate.stats import StatsEmitter, stats_init
  28. from frigate.version import VERSION
  29. from frigate.video import capture_camera, track_camera
  30. from frigate.watchdog import FrigateWatchdog
  31. logger = logging.getLogger(__name__)
  32. class FrigateApp:
  33. def __init__(self):
  34. self.stop_event = mp.Event()
  35. self.base_config: FrigateConfig = None
  36. self.config: FrigateConfig = None
  37. self.detection_queue = mp.Queue()
  38. self.detectors: Dict[str, EdgeTPUProcess] = {}
  39. self.detection_out_events: Dict[str, mp.Event] = {}
  40. self.detection_shms: List[mp.shared_memory.SharedMemory] = []
  41. self.log_queue = mp.Queue()
  42. self.camera_metrics = {}
  43. def set_environment_vars(self):
  44. for key, value in self.config.environment_vars.items():
  45. os.environ[key] = value
  46. def ensure_dirs(self):
  47. for d in [RECORD_DIR, CLIPS_DIR, CACHE_DIR]:
  48. if not os.path.exists(d) and not os.path.islink(d):
  49. logger.info(f"Creating directory: {d}")
  50. os.makedirs(d)
  51. else:
  52. logger.debug(f"Skipping directory: {d}")
  53. def init_logger(self):
  54. self.log_process = mp.Process(
  55. target=log_process, args=(self.log_queue,), name="log_process"
  56. )
  57. self.log_process.daemon = True
  58. self.log_process.start()
  59. root_configurer(self.log_queue)
  60. def init_config(self):
  61. config_file = os.environ.get("CONFIG_FILE", "/config/config.yml")
  62. # Check if we can use .yaml instead of .yml
  63. config_file_yaml = config_file.replace(".yml", ".yaml")
  64. if os.path.isfile(config_file_yaml):
  65. config_file = config_file_yaml
  66. user_config = FrigateConfig.parse_file(config_file)
  67. self.config = user_config.runtime_config
  68. for camera_name in self.config.cameras.keys():
  69. # create camera_metrics
  70. self.camera_metrics[camera_name] = {
  71. "camera_fps": mp.Value("d", 0.0),
  72. "skipped_fps": mp.Value("d", 0.0),
  73. "process_fps": mp.Value("d", 0.0),
  74. "detection_enabled": mp.Value(
  75. "i", self.config.cameras[camera_name].detect.enabled
  76. ),
  77. "detection_fps": mp.Value("d", 0.0),
  78. "detection_frame": mp.Value("d", 0.0),
  79. "read_start": mp.Value("d", 0.0),
  80. "ffmpeg_pid": mp.Value("i", 0),
  81. "frame_queue": mp.Queue(maxsize=2),
  82. }
  83. def set_log_levels(self):
  84. logging.getLogger().setLevel(self.config.logger.default.value.upper())
  85. for log, level in self.config.logger.logs.items():
  86. logging.getLogger(log).setLevel(level.value.upper())
  87. if not "werkzeug" in self.config.logger.logs:
  88. logging.getLogger("werkzeug").setLevel("ERROR")
  89. def init_queues(self):
  90. # Queues for clip processing
  91. self.event_queue = mp.Queue()
  92. self.event_processed_queue = mp.Queue()
  93. self.video_output_queue = mp.Queue(maxsize=len(self.config.cameras.keys()) * 2)
  94. # Queue for cameras to push tracked objects to
  95. self.detected_frames_queue = mp.Queue(
  96. maxsize=len(self.config.cameras.keys()) * 2
  97. )
  98. # Queue for recordings info
  99. self.recordings_info_queue = mp.Queue()
  100. def init_database(self):
  101. # Migrate DB location
  102. old_db_path = os.path.join(CLIPS_DIR, "frigate.db")
  103. if not os.path.isfile(self.config.database.path) and os.path.isfile(
  104. old_db_path
  105. ):
  106. os.rename(old_db_path, self.config.database.path)
  107. # Migrate DB schema
  108. migrate_db = SqliteExtDatabase(self.config.database.path)
  109. # Run migrations
  110. del logging.getLogger("peewee_migrate").handlers[:]
  111. router = Router(migrate_db)
  112. router.run()
  113. migrate_db.close()
  114. self.db = SqliteQueueDatabase(self.config.database.path)
  115. models = [Event, Recordings]
  116. self.db.bind(models)
  117. def init_stats(self):
  118. self.stats_tracking = stats_init(self.camera_metrics, self.detectors)
  119. def init_web_server(self):
  120. self.flask_app = create_app(
  121. self.config,
  122. self.db,
  123. self.stats_tracking,
  124. self.detected_frames_processor,
  125. )
  126. def init_mqtt(self):
  127. self.mqtt_client = create_mqtt_client(self.config, self.camera_metrics)
  128. def start_mqtt_relay(self):
  129. self.mqtt_relay = MqttSocketRelay(
  130. self.mqtt_client, self.config.mqtt.topic_prefix
  131. )
  132. self.mqtt_relay.start()
  133. def start_detectors(self):
  134. model_path = self.config.model.path
  135. model_shape = (self.config.model.height, self.config.model.width)
  136. for name in self.config.cameras.keys():
  137. self.detection_out_events[name] = mp.Event()
  138. try:
  139. shm_in = mp.shared_memory.SharedMemory(
  140. name=name,
  141. create=True,
  142. size=self.config.model.height * self.config.model.width * 3,
  143. )
  144. except FileExistsError:
  145. shm_in = mp.shared_memory.SharedMemory(name=name)
  146. try:
  147. shm_out = mp.shared_memory.SharedMemory(
  148. name=f"out-{name}", create=True, size=20 * 6 * 4
  149. )
  150. except FileExistsError:
  151. shm_out = mp.shared_memory.SharedMemory(name=f"out-{name}")
  152. self.detection_shms.append(shm_in)
  153. self.detection_shms.append(shm_out)
  154. for name, detector in self.config.detectors.items():
  155. if detector.type == DetectorTypeEnum.cpu:
  156. self.detectors[name] = EdgeTPUProcess(
  157. name,
  158. self.detection_queue,
  159. self.detection_out_events,
  160. model_path,
  161. model_shape,
  162. "cpu",
  163. detector.num_threads,
  164. )
  165. if detector.type == DetectorTypeEnum.edgetpu:
  166. self.detectors[name] = EdgeTPUProcess(
  167. name,
  168. self.detection_queue,
  169. self.detection_out_events,
  170. model_path,
  171. model_shape,
  172. detector.device,
  173. detector.num_threads,
  174. )
  175. def start_detected_frames_processor(self):
  176. self.detected_frames_processor = TrackedObjectProcessor(
  177. self.config,
  178. self.mqtt_client,
  179. self.config.mqtt.topic_prefix,
  180. self.detected_frames_queue,
  181. self.event_queue,
  182. self.event_processed_queue,
  183. self.video_output_queue,
  184. self.recordings_info_queue,
  185. self.stop_event,
  186. )
  187. self.detected_frames_processor.start()
  188. def start_video_output_processor(self):
  189. output_processor = mp.Process(
  190. target=output_frames,
  191. name=f"output_processor",
  192. args=(
  193. self.config,
  194. self.video_output_queue,
  195. ),
  196. )
  197. output_processor.daemon = True
  198. self.output_processor = output_processor
  199. output_processor.start()
  200. logger.info(f"Output process started: {output_processor.pid}")
  201. def start_camera_processors(self):
  202. model_shape = (self.config.model.height, self.config.model.width)
  203. for name, config in self.config.cameras.items():
  204. camera_process = mp.Process(
  205. target=track_camera,
  206. name=f"camera_processor:{name}",
  207. args=(
  208. name,
  209. config,
  210. model_shape,
  211. self.config.model.merged_labelmap,
  212. self.detection_queue,
  213. self.detection_out_events[name],
  214. self.detected_frames_queue,
  215. self.camera_metrics[name],
  216. ),
  217. )
  218. camera_process.daemon = True
  219. self.camera_metrics[name]["process"] = camera_process
  220. camera_process.start()
  221. logger.info(f"Camera processor started for {name}: {camera_process.pid}")
  222. def start_camera_capture_processes(self):
  223. for name, config in self.config.cameras.items():
  224. capture_process = mp.Process(
  225. target=capture_camera,
  226. name=f"camera_capture:{name}",
  227. args=(name, config, self.camera_metrics[name]),
  228. )
  229. capture_process.daemon = True
  230. self.camera_metrics[name]["capture_process"] = capture_process
  231. capture_process.start()
  232. logger.info(f"Capture process started for {name}: {capture_process.pid}")
  233. def start_event_processor(self):
  234. self.event_processor = EventProcessor(
  235. self.config,
  236. self.camera_metrics,
  237. self.event_queue,
  238. self.event_processed_queue,
  239. self.stop_event,
  240. )
  241. self.event_processor.start()
  242. def start_event_cleanup(self):
  243. self.event_cleanup = EventCleanup(self.config, self.stop_event)
  244. self.event_cleanup.start()
  245. def start_recording_maintainer(self):
  246. self.recording_maintainer = RecordingMaintainer(
  247. self.config, self.recordings_info_queue, self.stop_event
  248. )
  249. self.recording_maintainer.start()
  250. def start_recording_cleanup(self):
  251. self.recording_cleanup = RecordingCleanup(self.config, self.stop_event)
  252. self.recording_cleanup.start()
  253. def start_stats_emitter(self):
  254. self.stats_emitter = StatsEmitter(
  255. self.config,
  256. self.stats_tracking,
  257. self.mqtt_client,
  258. self.config.mqtt.topic_prefix,
  259. self.stop_event,
  260. )
  261. self.stats_emitter.start()
  262. def start_watchdog(self):
  263. self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event)
  264. self.frigate_watchdog.start()
  265. def start(self):
  266. self.init_logger()
  267. logger.info(f"Starting Frigate ({VERSION})")
  268. try:
  269. try:
  270. self.init_config()
  271. except Exception as e:
  272. print("*************************************************************")
  273. print("*************************************************************")
  274. print("*** Your config file is not valid! ***")
  275. print("*** Please check the docs at ***")
  276. print("*** https://docs.frigate.video/configuration/index ***")
  277. print("*************************************************************")
  278. print("*************************************************************")
  279. print("*** Config Validation Errors ***")
  280. print("*************************************************************")
  281. print(e)
  282. print(traceback.format_exc())
  283. print("*************************************************************")
  284. print("*** End Config Validation Errors ***")
  285. print("*************************************************************")
  286. self.log_process.terminate()
  287. sys.exit(1)
  288. self.set_environment_vars()
  289. self.ensure_dirs()
  290. self.set_log_levels()
  291. self.init_queues()
  292. self.init_database()
  293. self.init_mqtt()
  294. except Exception as e:
  295. print(e)
  296. self.log_process.terminate()
  297. sys.exit(1)
  298. self.start_detectors()
  299. self.start_video_output_processor()
  300. self.start_detected_frames_processor()
  301. self.start_camera_processors()
  302. self.start_camera_capture_processes()
  303. self.init_stats()
  304. self.init_web_server()
  305. self.start_mqtt_relay()
  306. self.start_event_processor()
  307. self.start_event_cleanup()
  308. self.start_recording_maintainer()
  309. self.start_recording_cleanup()
  310. self.start_stats_emitter()
  311. self.start_watchdog()
  312. # self.zeroconf = broadcast_zeroconf(self.config.mqtt.client_id)
  313. def receiveSignal(signalNumber, frame):
  314. self.stop()
  315. sys.exit()
  316. signal.signal(signal.SIGTERM, receiveSignal)
  317. try:
  318. self.flask_app.run(host="127.0.0.1", port=5001, debug=False)
  319. except KeyboardInterrupt:
  320. pass
  321. self.stop()
  322. def stop(self):
  323. logger.info(f"Stopping...")
  324. self.stop_event.set()
  325. self.mqtt_relay.stop()
  326. self.detected_frames_processor.join()
  327. self.event_processor.join()
  328. self.event_cleanup.join()
  329. self.recording_maintainer.join()
  330. self.recording_cleanup.join()
  331. self.stats_emitter.join()
  332. self.frigate_watchdog.join()
  333. self.db.stop()
  334. for detector in self.detectors.values():
  335. detector.stop()
  336. while len(self.detection_shms) > 0:
  337. shm = self.detection_shms.pop()
  338. shm.close()
  339. shm.unlink()