app.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. import json
  2. import logging
  3. import multiprocessing as mp
  4. import os
  5. import signal
  6. import sys
  7. import threading
  8. from logging.handlers import QueueHandler
  9. from typing import Dict, List
  10. import yaml
  11. from peewee_migrate import Router
  12. from playhouse.sqlite_ext import SqliteExtDatabase
  13. from playhouse.sqliteq import SqliteQueueDatabase
  14. from frigate.config import DetectorTypeEnum, FrigateConfig
  15. from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
  16. from frigate.edgetpu import EdgeTPUProcess
  17. from frigate.events import EventCleanup, EventProcessor
  18. from frigate.http import create_app
  19. from frigate.log import log_process, root_configurer
  20. from frigate.models import Event, Recordings
  21. from frigate.mqtt import create_mqtt_client, MqttSocketRelay
  22. from frigate.object_processing import TrackedObjectProcessor
  23. from frigate.output import output_frames
  24. from frigate.record import RecordingCleanup, RecordingMaintainer
  25. from frigate.stats import StatsEmitter, stats_init
  26. from frigate.video import capture_camera, track_camera
  27. from frigate.watchdog import FrigateWatchdog
  28. from frigate.zeroconf import broadcast_zeroconf
  29. logger = logging.getLogger(__name__)
  30. class FrigateApp:
  31. def __init__(self):
  32. self.stop_event = mp.Event()
  33. self.base_config: FrigateConfig = None
  34. self.config: FrigateConfig = None
  35. self.detection_queue = mp.Queue()
  36. self.detectors: Dict[str, EdgeTPUProcess] = {}
  37. self.detection_out_events: Dict[str, mp.Event] = {}
  38. self.detection_shms: List[mp.shared_memory.SharedMemory] = []
  39. self.log_queue = mp.Queue()
  40. self.camera_metrics = {}
  41. def set_environment_vars(self):
  42. for key, value in self.config.environment_vars.items():
  43. os.environ[key] = value
  44. def ensure_dirs(self):
  45. for d in [RECORD_DIR, CLIPS_DIR, CACHE_DIR]:
  46. if not os.path.exists(d) and not os.path.islink(d):
  47. logger.info(f"Creating directory: {d}")
  48. os.makedirs(d)
  49. else:
  50. logger.debug(f"Skipping directory: {d}")
  51. def init_logger(self):
  52. self.log_process = mp.Process(
  53. target=log_process, args=(self.log_queue,), name="log_process"
  54. )
  55. self.log_process.daemon = True
  56. self.log_process.start()
  57. root_configurer(self.log_queue)
  58. def init_config(self):
  59. config_file = os.environ.get("CONFIG_FILE", "/config/config.yml")
  60. user_config = FrigateConfig.parse_file(config_file)
  61. self.config = user_config.runtime_config
  62. for camera_name in self.config.cameras.keys():
  63. # create camera_metrics
  64. self.camera_metrics[camera_name] = {
  65. "camera_fps": mp.Value("d", 0.0),
  66. "skipped_fps": mp.Value("d", 0.0),
  67. "process_fps": mp.Value("d", 0.0),
  68. "detection_enabled": mp.Value(
  69. "i", self.config.cameras[camera_name].detect.enabled
  70. ),
  71. "detection_fps": mp.Value("d", 0.0),
  72. "detection_frame": mp.Value("d", 0.0),
  73. "read_start": mp.Value("d", 0.0),
  74. "ffmpeg_pid": mp.Value("i", 0),
  75. "frame_queue": mp.Queue(maxsize=2),
  76. }
  77. def check_config(self):
  78. for name, camera in self.config.cameras.items():
  79. assigned_roles = list(
  80. set([r for i in camera.ffmpeg.inputs for r in i.roles])
  81. )
  82. if not camera.clips.enabled and "clips" in assigned_roles:
  83. logger.warning(
  84. f"Camera {name} has clips assigned to an input, but clips is not enabled."
  85. )
  86. elif camera.clips.enabled and not "clips" in assigned_roles:
  87. logger.warning(
  88. f"Camera {name} has clips enabled, but clips is not assigned to an input."
  89. )
  90. if not camera.record.enabled and "record" in assigned_roles:
  91. logger.warning(
  92. f"Camera {name} has record assigned to an input, but record is not enabled."
  93. )
  94. elif camera.record.enabled and not "record" in assigned_roles:
  95. logger.warning(
  96. f"Camera {name} has record enabled, but record is not assigned to an input."
  97. )
  98. if not camera.rtmp.enabled and "rtmp" in assigned_roles:
  99. logger.warning(
  100. f"Camera {name} has rtmp assigned to an input, but rtmp is not enabled."
  101. )
  102. elif camera.rtmp.enabled and not "rtmp" in assigned_roles:
  103. logger.warning(
  104. f"Camera {name} has rtmp enabled, but rtmp is not assigned to an input."
  105. )
  106. def set_log_levels(self):
  107. logging.getLogger().setLevel(self.config.logger.default.value.upper())
  108. for log, level in self.config.logger.logs.items():
  109. logging.getLogger(log).setLevel(level.value.upper())
  110. if not "werkzeug" in self.config.logger.logs:
  111. logging.getLogger("werkzeug").setLevel("ERROR")
  112. def init_queues(self):
  113. # Queues for clip processing
  114. self.event_queue = mp.Queue()
  115. self.event_processed_queue = mp.Queue()
  116. self.video_output_queue = mp.Queue(maxsize=len(self.config.cameras.keys()) * 2)
  117. # Queue for cameras to push tracked objects to
  118. self.detected_frames_queue = mp.Queue(
  119. maxsize=len(self.config.cameras.keys()) * 2
  120. )
  121. def init_database(self):
  122. # Migrate DB location
  123. old_db_path = os.path.join(CLIPS_DIR, "frigate.db")
  124. if not os.path.isfile(self.config.database.path) and os.path.isfile(
  125. old_db_path
  126. ):
  127. os.rename(old_db_path, self.config.database.path)
  128. # Migrate DB schema
  129. migrate_db = SqliteExtDatabase(self.config.database.path)
  130. # Run migrations
  131. del logging.getLogger("peewee_migrate").handlers[:]
  132. router = Router(migrate_db)
  133. router.run()
  134. migrate_db.close()
  135. self.db = SqliteQueueDatabase(self.config.database.path)
  136. models = [Event, Recordings]
  137. self.db.bind(models)
  138. def init_stats(self):
  139. self.stats_tracking = stats_init(self.camera_metrics, self.detectors)
  140. def init_web_server(self):
  141. self.flask_app = create_app(
  142. self.config,
  143. self.db,
  144. self.stats_tracking,
  145. self.detected_frames_processor,
  146. )
  147. def init_mqtt(self):
  148. self.mqtt_client = create_mqtt_client(self.config, self.camera_metrics)
  149. def start_mqtt_relay(self):
  150. self.mqtt_relay = MqttSocketRelay(
  151. self.mqtt_client, self.config.mqtt.topic_prefix
  152. )
  153. self.mqtt_relay.start()
  154. def start_detectors(self):
  155. model_shape = (self.config.model.height, self.config.model.width)
  156. for name in self.config.cameras.keys():
  157. self.detection_out_events[name] = mp.Event()
  158. try:
  159. shm_in = mp.shared_memory.SharedMemory(
  160. name=name,
  161. create=True,
  162. size=self.config.model.height * self.config.model.width * 3,
  163. )
  164. except FileExistsError:
  165. shm_in = mp.shared_memory.SharedMemory(name=name)
  166. try:
  167. shm_out = mp.shared_memory.SharedMemory(
  168. name=f"out-{name}", create=True, size=20 * 6 * 4
  169. )
  170. except FileExistsError:
  171. shm_out = mp.shared_memory.SharedMemory(name=f"out-{name}")
  172. self.detection_shms.append(shm_in)
  173. self.detection_shms.append(shm_out)
  174. for name, detector in self.config.detectors.items():
  175. if detector.type == DetectorTypeEnum.cpu:
  176. self.detectors[name] = EdgeTPUProcess(
  177. name,
  178. self.detection_queue,
  179. self.detection_out_events,
  180. model_shape,
  181. "cpu",
  182. detector.num_threads,
  183. )
  184. if detector.type == DetectorTypeEnum.edgetpu:
  185. self.detectors[name] = EdgeTPUProcess(
  186. name,
  187. self.detection_queue,
  188. self.detection_out_events,
  189. model_shape,
  190. detector.device,
  191. detector.num_threads,
  192. )
  193. def start_detected_frames_processor(self):
  194. self.detected_frames_processor = TrackedObjectProcessor(
  195. self.config,
  196. self.mqtt_client,
  197. self.config.mqtt.topic_prefix,
  198. self.detected_frames_queue,
  199. self.event_queue,
  200. self.event_processed_queue,
  201. self.video_output_queue,
  202. self.stop_event,
  203. )
  204. self.detected_frames_processor.start()
  205. def start_video_output_processor(self):
  206. output_processor = mp.Process(
  207. target=output_frames,
  208. name=f"output_processor",
  209. args=(
  210. self.config,
  211. self.video_output_queue,
  212. ),
  213. )
  214. output_processor.daemon = True
  215. self.output_processor = output_processor
  216. output_processor.start()
  217. logger.info(f"Output process started: {output_processor.pid}")
  218. def start_camera_processors(self):
  219. model_shape = (self.config.model.height, self.config.model.width)
  220. for name, config in self.config.cameras.items():
  221. camera_process = mp.Process(
  222. target=track_camera,
  223. name=f"camera_processor:{name}",
  224. args=(
  225. name,
  226. config,
  227. model_shape,
  228. self.config.model.merged_labelmap,
  229. self.detection_queue,
  230. self.detection_out_events[name],
  231. self.detected_frames_queue,
  232. self.camera_metrics[name],
  233. ),
  234. )
  235. camera_process.daemon = True
  236. self.camera_metrics[name]["process"] = camera_process
  237. camera_process.start()
  238. logger.info(f"Camera processor started for {name}: {camera_process.pid}")
  239. def start_camera_capture_processes(self):
  240. for name, config in self.config.cameras.items():
  241. capture_process = mp.Process(
  242. target=capture_camera,
  243. name=f"camera_capture:{name}",
  244. args=(name, config, self.camera_metrics[name]),
  245. )
  246. capture_process.daemon = True
  247. self.camera_metrics[name]["capture_process"] = capture_process
  248. capture_process.start()
  249. logger.info(f"Capture process started for {name}: {capture_process.pid}")
  250. def start_event_processor(self):
  251. self.event_processor = EventProcessor(
  252. self.config,
  253. self.camera_metrics,
  254. self.event_queue,
  255. self.event_processed_queue,
  256. self.stop_event,
  257. )
  258. self.event_processor.start()
  259. def start_event_cleanup(self):
  260. self.event_cleanup = EventCleanup(self.config, self.stop_event)
  261. self.event_cleanup.start()
  262. def start_recording_maintainer(self):
  263. self.recording_maintainer = RecordingMaintainer(self.config, self.stop_event)
  264. self.recording_maintainer.start()
  265. def start_recording_cleanup(self):
  266. self.recording_cleanup = RecordingCleanup(self.config, self.stop_event)
  267. self.recording_cleanup.start()
  268. def start_stats_emitter(self):
  269. self.stats_emitter = StatsEmitter(
  270. self.config,
  271. self.stats_tracking,
  272. self.mqtt_client,
  273. self.config.mqtt.topic_prefix,
  274. self.stop_event,
  275. )
  276. self.stats_emitter.start()
  277. def start_watchdog(self):
  278. self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event)
  279. self.frigate_watchdog.start()
  280. def start(self):
  281. self.init_logger()
  282. try:
  283. try:
  284. self.init_config()
  285. except Exception as e:
  286. print(f"Error parsing config: {e}")
  287. self.log_process.terminate()
  288. sys.exit(1)
  289. self.set_environment_vars()
  290. self.ensure_dirs()
  291. self.check_config()
  292. self.set_log_levels()
  293. self.init_queues()
  294. self.init_database()
  295. self.init_mqtt()
  296. except Exception as e:
  297. print(e)
  298. self.log_process.terminate()
  299. sys.exit(1)
  300. self.start_detectors()
  301. self.start_video_output_processor()
  302. self.start_detected_frames_processor()
  303. self.start_camera_processors()
  304. self.start_camera_capture_processes()
  305. self.init_stats()
  306. self.init_web_server()
  307. self.start_mqtt_relay()
  308. self.start_event_processor()
  309. self.start_event_cleanup()
  310. self.start_recording_maintainer()
  311. self.start_recording_cleanup()
  312. self.start_stats_emitter()
  313. self.start_watchdog()
  314. # self.zeroconf = broadcast_zeroconf(self.config.mqtt.client_id)
  315. def receiveSignal(signalNumber, frame):
  316. self.stop()
  317. sys.exit()
  318. signal.signal(signal.SIGTERM, receiveSignal)
  319. try:
  320. self.flask_app.run(host="127.0.0.1", port=5001, debug=False)
  321. except KeyboardInterrupt:
  322. pass
  323. self.stop()
  324. def stop(self):
  325. logger.info(f"Stopping...")
  326. self.stop_event.set()
  327. self.mqtt_relay.stop()
  328. self.detected_frames_processor.join()
  329. self.event_processor.join()
  330. self.event_cleanup.join()
  331. self.recording_maintainer.join()
  332. self.recording_cleanup.join()
  333. self.stats_emitter.join()
  334. self.frigate_watchdog.join()
  335. self.db.stop()
  336. for detector in self.detectors.values():
  337. detector.stop()
  338. while len(self.detection_shms) > 0:
  339. shm = self.detection_shms.pop()
  340. shm.close()
  341. shm.unlink()