detect_objects.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. import faulthandler; faulthandler.enable()
  2. import os
  3. import signal
  4. import sys
  5. import traceback
  6. import signal
  7. import cv2
  8. import time
  9. import datetime
  10. import queue
  11. import yaml
  12. import json
  13. import threading
  14. import multiprocessing as mp
  15. import subprocess as sp
  16. import numpy as np
  17. import logging
  18. from flask import Flask, Response, make_response, jsonify, request
  19. import paho.mqtt.client as mqtt
  20. from frigate.video import track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg
  21. from frigate.object_processing import TrackedObjectProcessor
  22. from frigate.events import EventProcessor
  23. from frigate.util import EventsPerSecond
  24. from frigate.edgetpu import EdgeTPUProcess
  25. FRIGATE_VARS = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
  26. CONFIG_FILE = os.environ.get('CONFIG_FILE', '/config/config.yml')
  27. if CONFIG_FILE.endswith(".yml"):
  28. with open(CONFIG_FILE) as f:
  29. CONFIG = yaml.safe_load(f)
  30. elif CONFIG_FILE.endswith(".json"):
  31. with open(CONFIG_FILE) as f:
  32. CONFIG = json.load(f)
  33. CACHE_DIR = CONFIG.get('save_clips', {}).get('cache_dir', '/cache')
  34. CLIPS_DIR = CONFIG.get('save_clips', {}).get('clips_dir', '/cache')
  35. if not os.path.exists(CACHE_DIR):
  36. os.makedirs(CACHE_DIR)
  37. if not os.path.exists(CLIPS_DIR):
  38. os.makedirs(CLIPS_DIR)
  39. MQTT_HOST = CONFIG['mqtt']['host']
  40. MQTT_PORT = CONFIG.get('mqtt', {}).get('port', 1883)
  41. MQTT_TOPIC_PREFIX = CONFIG.get('mqtt', {}).get('topic_prefix', 'frigate')
  42. MQTT_USER = CONFIG.get('mqtt', {}).get('user')
  43. MQTT_PASS = CONFIG.get('mqtt', {}).get('password')
  44. if not MQTT_PASS is None:
  45. MQTT_PASS = MQTT_PASS.format(**FRIGATE_VARS)
  46. MQTT_CLIENT_ID = CONFIG.get('mqtt', {}).get('client_id', 'frigate')
  47. # Set the default FFmpeg config
  48. FFMPEG_CONFIG = CONFIG.get('ffmpeg', {})
  49. FFMPEG_DEFAULT_CONFIG = {
  50. 'global_args': FFMPEG_CONFIG.get('global_args',
  51. ['-hide_banner','-loglevel','panic']),
  52. 'hwaccel_args': FFMPEG_CONFIG.get('hwaccel_args',
  53. []),
  54. 'input_args': FFMPEG_CONFIG.get('input_args',
  55. ['-avoid_negative_ts', 'make_zero',
  56. '-fflags', 'nobuffer',
  57. '-flags', 'low_delay',
  58. '-strict', 'experimental',
  59. '-fflags', '+genpts+discardcorrupt',
  60. '-rtsp_transport', 'tcp',
  61. '-stimeout', '5000000',
  62. '-use_wallclock_as_timestamps', '1']),
  63. 'output_args': FFMPEG_CONFIG.get('output_args',
  64. ['-f', 'rawvideo',
  65. '-pix_fmt', 'yuv420p'])
  66. }
  67. GLOBAL_OBJECT_CONFIG = CONFIG.get('objects', {})
  68. WEB_PORT = CONFIG.get('web_port', 5000)
  69. DETECTORS = CONFIG.get('detectors', {'coral': {'type': 'edgetpu', 'device': 'usb'}})
  70. class CameraWatchdog(threading.Thread):
  71. def __init__(self, camera_processes, config, detectors, detection_queue, tracked_objects_queue, stop_event):
  72. threading.Thread.__init__(self)
  73. self.camera_processes = camera_processes
  74. self.config = config
  75. self.detectors = detectors
  76. self.detection_queue = detection_queue
  77. self.tracked_objects_queue = tracked_objects_queue
  78. self.stop_event = stop_event
  79. def run(self):
  80. time.sleep(10)
  81. while True:
  82. # wait a bit before checking
  83. time.sleep(10)
  84. if self.stop_event.is_set():
  85. print(f"Exiting watchdog...")
  86. break
  87. now = datetime.datetime.now().timestamp()
  88. # check the detection processes
  89. for detector in self.detectors.values():
  90. detection_start = detector.detection_start.value
  91. if (detection_start > 0.0 and
  92. now - detection_start > 10):
  93. print("Detection appears to be stuck. Restarting detection process")
  94. detector.start_or_restart()
  95. elif not detector.detect_process.is_alive():
  96. print("Detection appears to have stopped. Restarting detection process")
  97. detector.start_or_restart()
  98. # check the camera processes
  99. for name, camera_process in self.camera_processes.items():
  100. process = camera_process['process']
  101. if not process.is_alive():
  102. print(f"Track process for {name} is not alive. Starting again...")
  103. camera_process['process_fps'].value = 0.0
  104. camera_process['detection_fps'].value = 0.0
  105. camera_process['read_start'].value = 0.0
  106. process = mp.Process(target=track_camera, args=(name, self.config[name], camera_process['frame_queue'],
  107. camera_process['frame_shape'], self.detection_queue, self.tracked_objects_queue,
  108. camera_process['process_fps'], camera_process['detection_fps'],
  109. camera_process['read_start'], self.stop_event))
  110. process.daemon = True
  111. camera_process['process'] = process
  112. process.start()
  113. print(f"Track process started for {name}: {process.pid}")
  114. if not camera_process['capture_thread'].is_alive():
  115. frame_shape = camera_process['frame_shape']
  116. frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
  117. ffmpeg_process = start_or_restart_ffmpeg(camera_process['ffmpeg_cmd'], frame_size)
  118. camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, camera_process['frame_queue'],
  119. camera_process['take_frame'], camera_process['camera_fps'], self.stop_event)
  120. camera_capture.start()
  121. camera_process['ffmpeg_process'] = ffmpeg_process
  122. camera_process['capture_thread'] = camera_capture
  123. elif now - camera_process['capture_thread'].current_frame.value > 5:
  124. print(f"No frames received from {name} in 5 seconds. Exiting ffmpeg...")
  125. ffmpeg_process = camera_process['ffmpeg_process']
  126. ffmpeg_process.terminate()
  127. try:
  128. print("Waiting for ffmpeg to exit gracefully...")
  129. ffmpeg_process.communicate(timeout=30)
  130. except sp.TimeoutExpired:
  131. print("FFmpeg didnt exit. Force killing...")
  132. ffmpeg_process.kill()
  133. ffmpeg_process.communicate()
  134. def main():
  135. stop_event = threading.Event()
  136. # connect to mqtt and setup last will
  137. def on_connect(client, userdata, flags, rc):
  138. print("On connect called")
  139. if rc != 0:
  140. if rc == 3:
  141. print ("MQTT Server unavailable")
  142. elif rc == 4:
  143. print ("MQTT Bad username or password")
  144. elif rc == 5:
  145. print ("MQTT Not authorized")
  146. else:
  147. print ("Unable to connect to MQTT: Connection refused. Error code: " + str(rc))
  148. # publish a message to signal that the service is running
  149. client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
  150. client = mqtt.Client(client_id=MQTT_CLIENT_ID)
  151. client.on_connect = on_connect
  152. client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
  153. if not MQTT_USER is None:
  154. client.username_pw_set(MQTT_USER, password=MQTT_PASS)
  155. client.connect(MQTT_HOST, MQTT_PORT, 60)
  156. client.loop_start()
  157. ##
  158. # Setup config defaults for cameras
  159. ##
  160. for name, config in CONFIG['cameras'].items():
  161. config['snapshots'] = {
  162. 'show_timestamp': config.get('snapshots', {}).get('show_timestamp', True),
  163. 'draw_zones': config.get('snapshots', {}).get('draw_zones', False),
  164. 'draw_bounding_boxes': config.get('snapshots', {}).get('draw_bounding_boxes', True)
  165. }
  166. config['zones'] = config.get('zones', {})
  167. # Queue for cameras to push tracked objects to
  168. tracked_objects_queue = mp.Queue()
  169. # Queue for clip processing
  170. event_queue = mp.Queue()
  171. # create the detection pipes and shms
  172. out_events = {}
  173. camera_shms = []
  174. for name in CONFIG['cameras'].keys():
  175. out_events[name] = mp.Event()
  176. shm_in = mp.shared_memory.SharedMemory(name=name, create=True, size=300*300*3)
  177. shm_out = mp.shared_memory.SharedMemory(name=f"out-{name}", create=True, size=20*6*4)
  178. camera_shms.append(shm_in)
  179. camera_shms.append(shm_out)
  180. detection_queue = mp.Queue()
  181. detectors = {}
  182. for name, detector in DETECTORS.items():
  183. if detector['type'] == 'cpu':
  184. detectors[name] = EdgeTPUProcess(detection_queue, out_events=out_events, tf_device='cpu')
  185. if detector['type'] == 'edgetpu':
  186. detectors[name] = EdgeTPUProcess(detection_queue, out_events=out_events, tf_device=detector['device'])
  187. # create the camera processes
  188. camera_processes = {}
  189. for name, config in CONFIG['cameras'].items():
  190. # Merge the ffmpeg config with the global config
  191. ffmpeg = config.get('ffmpeg', {})
  192. ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
  193. ffmpeg_global_args = ffmpeg.get('global_args', FFMPEG_DEFAULT_CONFIG['global_args'])
  194. ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', FFMPEG_DEFAULT_CONFIG['hwaccel_args'])
  195. ffmpeg_input_args = ffmpeg.get('input_args', FFMPEG_DEFAULT_CONFIG['input_args'])
  196. ffmpeg_output_args = ffmpeg.get('output_args', FFMPEG_DEFAULT_CONFIG['output_args'])
  197. if not config.get('fps') is None:
  198. ffmpeg_output_args = ["-r", str(config.get('fps'))] + ffmpeg_output_args
  199. if config.get('save_clips', {}).get('enabled', False):
  200. ffmpeg_output_args = [
  201. "-f",
  202. "segment",
  203. "-segment_time",
  204. "10",
  205. "-segment_format",
  206. "mp4",
  207. "-reset_timestamps",
  208. "1",
  209. "-strftime",
  210. "1",
  211. "-c",
  212. "copy",
  213. "-an",
  214. "-map",
  215. "0",
  216. f"{os.path.join(CACHE_DIR, name)}-%Y%m%d%H%M%S.mp4"
  217. ] + ffmpeg_output_args
  218. ffmpeg_cmd = (['ffmpeg'] +
  219. ffmpeg_global_args +
  220. ffmpeg_hwaccel_args +
  221. ffmpeg_input_args +
  222. ['-i', ffmpeg_input] +
  223. ffmpeg_output_args +
  224. ['pipe:'])
  225. if 'width' in config and 'height' in config:
  226. frame_shape = (config['height'], config['width'], 3)
  227. else:
  228. frame_shape = get_frame_shape(ffmpeg_input)
  229. config['frame_shape'] = frame_shape
  230. frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
  231. take_frame = config.get('take_frame', 1)
  232. detection_frame = mp.Value('d', 0.0)
  233. ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
  234. frame_queue = mp.Queue(maxsize=2)
  235. camera_fps = EventsPerSecond()
  236. camera_fps.start()
  237. camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, stop_event)
  238. camera_capture.start()
  239. camera_processes[name] = {
  240. 'camera_fps': camera_fps,
  241. 'take_frame': take_frame,
  242. 'process_fps': mp.Value('d', 0.0),
  243. 'detection_fps': mp.Value('d', 0.0),
  244. 'detection_frame': detection_frame,
  245. 'read_start': mp.Value('d', 0.0),
  246. 'ffmpeg_process': ffmpeg_process,
  247. 'ffmpeg_cmd': ffmpeg_cmd,
  248. 'frame_queue': frame_queue,
  249. 'frame_shape': frame_shape,
  250. 'capture_thread': camera_capture
  251. }
  252. # merge global object config into camera object config
  253. camera_objects_config = config.get('objects', {})
  254. # get objects to track for camera
  255. objects_to_track = camera_objects_config.get('track', GLOBAL_OBJECT_CONFIG.get('track', ['person']))
  256. # get object filters
  257. object_filters = camera_objects_config.get('filters', GLOBAL_OBJECT_CONFIG.get('filters', {}))
  258. config['objects'] = {
  259. 'track': objects_to_track,
  260. 'filters': object_filters
  261. }
  262. camera_process = mp.Process(target=track_camera, args=(name, config, frame_queue, frame_shape,
  263. detection_queue, out_events[name], tracked_objects_queue, camera_processes[name]['process_fps'],
  264. camera_processes[name]['detection_fps'],
  265. camera_processes[name]['read_start'], camera_processes[name]['detection_frame'], stop_event))
  266. camera_process.daemon = True
  267. camera_processes[name]['process'] = camera_process
  268. # start the camera_processes
  269. for name, camera_process in camera_processes.items():
  270. camera_process['process'].start()
  271. print(f"Camera_process started for {name}: {camera_process['process'].pid}")
  272. event_processor = EventProcessor(CONFIG, camera_processes, CACHE_DIR, CLIPS_DIR, event_queue, stop_event)
  273. event_processor.start()
  274. object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue, event_queue, stop_event)
  275. object_processor.start()
  276. camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], detectors, detection_queue, tracked_objects_queue, stop_event)
  277. camera_watchdog.start()
  278. def receiveSignal(signalNumber, frame):
  279. print('Received:', signalNumber)
  280. stop_event.set()
  281. event_processor.join()
  282. object_processor.join()
  283. camera_watchdog.join()
  284. for camera_name, camera_process in camera_processes.items():
  285. camera_process['capture_thread'].join()
  286. # cleanup the frame queue
  287. while not camera_process['frame_queue'].empty():
  288. frame_time = camera_process['frame_queue'].get()
  289. shm = mp.shared_memory.SharedMemory(name=f"{camera_name}{frame_time}")
  290. shm.close()
  291. shm.unlink()
  292. for detector in detectors.values():
  293. detector.stop()
  294. for shm in camera_shms:
  295. shm.close()
  296. shm.unlink()
  297. sys.exit()
  298. signal.signal(signal.SIGTERM, receiveSignal)
  299. signal.signal(signal.SIGINT, receiveSignal)
  300. # create a flask app that encodes frames a mjpeg on demand
  301. app = Flask(__name__)
  302. log = logging.getLogger('werkzeug')
  303. log.setLevel(logging.ERROR)
  304. @app.route('/')
  305. def ishealthy():
  306. # return a healh
  307. return "Frigate is running. Alive and healthy!"
  308. @app.route('/debug/stack')
  309. def processor_stack():
  310. frame = sys._current_frames().get(object_processor.ident, None)
  311. if frame:
  312. return "<br>".join(traceback.format_stack(frame)), 200
  313. else:
  314. return "no frame found", 200
  315. @app.route('/debug/print_stack')
  316. def print_stack():
  317. pid = int(request.args.get('pid', 0))
  318. if pid == 0:
  319. return "missing pid", 200
  320. else:
  321. os.kill(pid, signal.SIGUSR1)
  322. return "check logs", 200
  323. @app.route('/debug/stats')
  324. def stats():
  325. stats = {}
  326. total_detection_fps = 0
  327. for name, camera_stats in camera_processes.items():
  328. total_detection_fps += camera_stats['detection_fps'].value
  329. capture_thread = camera_stats['capture_thread']
  330. stats[name] = {
  331. 'camera_fps': round(capture_thread.fps.eps(), 2),
  332. 'process_fps': round(camera_stats['process_fps'].value, 2),
  333. 'skipped_fps': round(capture_thread.skipped_fps.eps(), 2),
  334. 'detection_fps': round(camera_stats['detection_fps'].value, 2),
  335. 'read_start': camera_stats['read_start'].value,
  336. 'pid': camera_stats['process'].pid,
  337. 'ffmpeg_pid': camera_stats['ffmpeg_process'].pid,
  338. 'frame_info': {
  339. 'read': capture_thread.current_frame.value,
  340. 'detect': camera_stats['detection_frame'].value,
  341. 'process': object_processor.camera_data[name]['current_frame_time']
  342. }
  343. }
  344. stats['detectors'] = {}
  345. for name, detector in detectors.items():
  346. stats['detectors'][name] = {
  347. 'inference_speed': round(detector.avg_inference_speed.value*1000, 2),
  348. 'detection_start': detector.detection_start.value,
  349. 'pid': detector.detect_process.pid
  350. }
  351. stats['detection_fps'] = round(total_detection_fps, 2)
  352. return jsonify(stats)
  353. @app.route('/<camera_name>/<label>/best.jpg')
  354. def best(camera_name, label):
  355. if camera_name in CONFIG['cameras']:
  356. best_object = object_processor.get_best(camera_name, label)
  357. best_frame = best_object.get('frame')
  358. if best_frame is None:
  359. best_frame = np.zeros((720,1280,3), np.uint8)
  360. else:
  361. best_frame = cv2.cvtColor(best_frame, cv2.COLOR_YUV2BGR_I420)
  362. crop = bool(request.args.get('crop', 0, type=int))
  363. if crop:
  364. region = best_object.get('region', [0,0,300,300])
  365. best_frame = best_frame[region[1]:region[3], region[0]:region[2]]
  366. height = int(request.args.get('h', str(best_frame.shape[0])))
  367. width = int(height*best_frame.shape[1]/best_frame.shape[0])
  368. best_frame = cv2.resize(best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
  369. ret, jpg = cv2.imencode('.jpg', best_frame)
  370. response = make_response(jpg.tobytes())
  371. response.headers['Content-Type'] = 'image/jpg'
  372. return response
  373. else:
  374. return "Camera named {} not found".format(camera_name), 404
  375. @app.route('/<camera_name>')
  376. def mjpeg_feed(camera_name):
  377. fps = int(request.args.get('fps', '3'))
  378. height = int(request.args.get('h', '360'))
  379. if camera_name in CONFIG['cameras']:
  380. # return a multipart response
  381. return Response(imagestream(camera_name, fps, height),
  382. mimetype='multipart/x-mixed-replace; boundary=frame')
  383. else:
  384. return "Camera named {} not found".format(camera_name), 404
  385. @app.route('/<camera_name>/latest.jpg')
  386. def latest_frame(camera_name):
  387. if camera_name in CONFIG['cameras']:
  388. # max out at specified FPS
  389. frame = object_processor.get_current_frame(camera_name)
  390. if frame is None:
  391. frame = np.zeros((720,1280,3), np.uint8)
  392. height = int(request.args.get('h', str(frame.shape[0])))
  393. width = int(height*frame.shape[1]/frame.shape[0])
  394. frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
  395. ret, jpg = cv2.imencode('.jpg', frame)
  396. response = make_response(jpg.tobytes())
  397. response.headers['Content-Type'] = 'image/jpg'
  398. return response
  399. else:
  400. return "Camera named {} not found".format(camera_name), 404
  401. def imagestream(camera_name, fps, height):
  402. while True:
  403. # max out at specified FPS
  404. time.sleep(1/fps)
  405. frame = object_processor.get_current_frame(camera_name, draw=True)
  406. if frame is None:
  407. frame = np.zeros((height,int(height*16/9),3), np.uint8)
  408. width = int(height*frame.shape[1]/frame.shape[0])
  409. frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_LINEAR)
  410. ret, jpg = cv2.imencode('.jpg', frame)
  411. yield (b'--frame\r\n'
  412. b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
  413. app.run(host='0.0.0.0', port=WEB_PORT, debug=False)
  414. object_processor.join()
  415. if __name__ == '__main__':
  416. main()