detect_objects.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. import os
  2. import signal
  3. import sys
  4. import traceback
  5. import signal
  6. import cv2
  7. import time
  8. import datetime
  9. import queue
  10. import yaml
  11. import threading
  12. import multiprocessing as mp
  13. import subprocess as sp
  14. import numpy as np
  15. import logging
  16. from flask import Flask, Response, make_response, jsonify, request
  17. import paho.mqtt.client as mqtt
  18. from frigate.video import track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg
  19. from frigate.object_processing import TrackedObjectProcessor
  20. from frigate.events import EventProcessor
  21. from frigate.util import EventsPerSecond
  22. from frigate.edgetpu import EdgeTPUProcess
  23. FRIGATE_VARS = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
  24. with open('/config/config.yml') as f:
  25. CONFIG = yaml.safe_load(f)
  26. MQTT_HOST = CONFIG['mqtt']['host']
  27. MQTT_PORT = CONFIG.get('mqtt', {}).get('port', 1883)
  28. MQTT_TOPIC_PREFIX = CONFIG.get('mqtt', {}).get('topic_prefix', 'frigate')
  29. MQTT_USER = CONFIG.get('mqtt', {}).get('user')
  30. MQTT_PASS = CONFIG.get('mqtt', {}).get('password')
  31. if not MQTT_PASS is None:
  32. MQTT_PASS = MQTT_PASS.format(**FRIGATE_VARS)
  33. MQTT_CLIENT_ID = CONFIG.get('mqtt', {}).get('client_id', 'frigate')
  34. # Set the default FFmpeg config
  35. FFMPEG_CONFIG = CONFIG.get('ffmpeg', {})
  36. FFMPEG_DEFAULT_CONFIG = {
  37. 'global_args': FFMPEG_CONFIG.get('global_args',
  38. ['-hide_banner','-loglevel','panic']),
  39. 'hwaccel_args': FFMPEG_CONFIG.get('hwaccel_args',
  40. []),
  41. 'input_args': FFMPEG_CONFIG.get('input_args',
  42. ['-avoid_negative_ts', 'make_zero',
  43. '-fflags', 'nobuffer',
  44. '-flags', 'low_delay',
  45. '-strict', 'experimental',
  46. '-fflags', '+genpts+discardcorrupt',
  47. '-rtsp_transport', 'tcp',
  48. '-stimeout', '5000000',
  49. '-use_wallclock_as_timestamps', '1']),
  50. 'output_args': FFMPEG_CONFIG.get('output_args',
  51. ['-f', 'rawvideo',
  52. '-pix_fmt', 'rgb24'])
  53. }
  54. GLOBAL_OBJECT_CONFIG = CONFIG.get('objects', {})
  55. WEB_PORT = CONFIG.get('web_port', 5000)
  56. DEBUG = (CONFIG.get('debug', '0') == '1')
  57. TENSORFLOW_DEVICE = CONFIG.get('tensorflow_device')
  58. def start_plasma_store():
  59. plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma']
  60. plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
  61. time.sleep(1)
  62. rc = plasma_process.poll()
  63. if rc is not None:
  64. return None
  65. return plasma_process
  66. class CameraWatchdog(threading.Thread):
  67. def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, plasma_process, stop_event):
  68. threading.Thread.__init__(self)
  69. self.camera_processes = camera_processes
  70. self.config = config
  71. self.tflite_process = tflite_process
  72. self.tracked_objects_queue = tracked_objects_queue
  73. self.plasma_process = plasma_process
  74. self.stop_event = stop_event
  75. def run(self):
  76. time.sleep(10)
  77. while True:
  78. # wait a bit before checking
  79. time.sleep(10)
  80. if self.stop_event.is_set():
  81. print(f"Exiting watchdog...")
  82. break
  83. now = datetime.datetime.now().timestamp()
  84. # check the plasma process
  85. rc = self.plasma_process.poll()
  86. if rc != None:
  87. print(f"plasma_process exited unexpectedly with {rc}")
  88. self.plasma_process = start_plasma_store()
  89. # check the detection process
  90. detection_start = self.tflite_process.detection_start.value
  91. if (detection_start > 0.0 and
  92. now - detection_start > 10):
  93. print("Detection appears to be stuck. Restarting detection process")
  94. self.tflite_process.start_or_restart()
  95. elif not self.tflite_process.detect_process.is_alive():
  96. print("Detection appears to have stopped. Restarting detection process")
  97. self.tflite_process.start_or_restart()
  98. # check the camera processes
  99. for name, camera_process in self.camera_processes.items():
  100. process = camera_process['process']
  101. if not process.is_alive():
  102. print(f"Track process for {name} is not alive. Starting again...")
  103. camera_process['process_fps'].value = 0.0
  104. camera_process['detection_fps'].value = 0.0
  105. camera_process['read_start'].value = 0.0
  106. process = mp.Process(target=track_camera, args=(name, self.config[name], camera_process['frame_queue'],
  107. camera_process['frame_shape'], self.tflite_process.detection_queue, self.tracked_objects_queue,
  108. camera_process['process_fps'], camera_process['detection_fps'],
  109. camera_process['read_start'], camera_process['detection_frame'], self.stop_event))
  110. process.daemon = True
  111. camera_process['process'] = process
  112. process.start()
  113. print(f"Track process started for {name}: {process.pid}")
  114. if not camera_process['capture_thread'].is_alive():
  115. frame_shape = camera_process['frame_shape']
  116. frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
  117. ffmpeg_process = start_or_restart_ffmpeg(camera_process['ffmpeg_cmd'], frame_size)
  118. camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, camera_process['frame_queue'],
  119. camera_process['take_frame'], camera_process['camera_fps'], camera_process['detection_frame'], self.stop_event)
  120. camera_capture.start()
  121. camera_process['ffmpeg_process'] = ffmpeg_process
  122. camera_process['capture_thread'] = camera_capture
  123. elif now - camera_process['capture_thread'].current_frame.value > 5:
  124. print(f"No frames received from {name} in 5 seconds. Exiting ffmpeg...")
  125. ffmpeg_process = camera_process['ffmpeg_process']
  126. ffmpeg_process.terminate()
  127. try:
  128. print("Waiting for ffmpeg to exit gracefully...")
  129. ffmpeg_process.communicate(timeout=30)
  130. except sp.TimeoutExpired:
  131. print("FFmpeg didnt exit. Force killing...")
  132. ffmpeg_process.kill()
  133. ffmpeg_process.communicate()
  134. def main():
  135. stop_event = threading.Event()
  136. # connect to mqtt and setup last will
  137. def on_connect(client, userdata, flags, rc):
  138. print("On connect called")
  139. if rc != 0:
  140. if rc == 3:
  141. print ("MQTT Server unavailable")
  142. elif rc == 4:
  143. print ("MQTT Bad username or password")
  144. elif rc == 5:
  145. print ("MQTT Not authorized")
  146. else:
  147. print ("Unable to connect to MQTT: Connection refused. Error code: " + str(rc))
  148. # publish a message to signal that the service is running
  149. client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
  150. client = mqtt.Client(client_id=MQTT_CLIENT_ID)
  151. client.on_connect = on_connect
  152. client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
  153. if not MQTT_USER is None:
  154. client.username_pw_set(MQTT_USER, password=MQTT_PASS)
  155. client.connect(MQTT_HOST, MQTT_PORT, 60)
  156. client.loop_start()
  157. plasma_process = start_plasma_store()
  158. ##
  159. # Setup config defaults for cameras
  160. ##
  161. for name, config in CONFIG['cameras'].items():
  162. config['snapshots'] = {
  163. 'show_timestamp': config.get('snapshots', {}).get('show_timestamp', True),
  164. 'draw_zones': config.get('snapshots', {}).get('draw_zones', False)
  165. }
  166. config['zones'] = {}
  167. # Queue for cameras to push tracked objects to
  168. tracked_objects_queue = mp.Queue()
  169. # Queue for clip processing
  170. event_queue = mp.Queue()
  171. # Start the shared tflite process
  172. tflite_process = EdgeTPUProcess(TENSORFLOW_DEVICE)
  173. # start the camera processes
  174. camera_processes = {}
  175. for name, config in CONFIG['cameras'].items():
  176. # Merge the ffmpeg config with the global config
  177. ffmpeg = config.get('ffmpeg', {})
  178. ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
  179. ffmpeg_global_args = ffmpeg.get('global_args', FFMPEG_DEFAULT_CONFIG['global_args'])
  180. ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', FFMPEG_DEFAULT_CONFIG['hwaccel_args'])
  181. ffmpeg_input_args = ffmpeg.get('input_args', FFMPEG_DEFAULT_CONFIG['input_args'])
  182. ffmpeg_output_args = ffmpeg.get('output_args', FFMPEG_DEFAULT_CONFIG['output_args'])
  183. if not config.get('fps') is None:
  184. ffmpeg_output_args = ["-r", str(config.get('fps'))] + ffmpeg_output_args
  185. if config.get('save_clips', {}).get('enabled', False):
  186. ffmpeg_output_args = [
  187. "-f",
  188. "segment",
  189. "-segment_time",
  190. "10",
  191. "-segment_format",
  192. "mp4",
  193. "-reset_timestamps",
  194. "1",
  195. "-strftime",
  196. "1",
  197. "-c",
  198. "copy",
  199. "-an",
  200. "-map",
  201. "0",
  202. f"/cache/{name}-%Y%m%d%H%M%S.mp4"
  203. ] + ffmpeg_output_args
  204. ffmpeg_cmd = (['ffmpeg'] +
  205. ffmpeg_global_args +
  206. ffmpeg_hwaccel_args +
  207. ffmpeg_input_args +
  208. ['-i', ffmpeg_input] +
  209. ffmpeg_output_args +
  210. ['pipe:'])
  211. if 'width' in config and 'height' in config:
  212. frame_shape = (config['height'], config['width'], 3)
  213. else:
  214. frame_shape = get_frame_shape(ffmpeg_input)
  215. frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
  216. take_frame = config.get('take_frame', 1)
  217. detection_frame = mp.Value('d', 0.0)
  218. ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
  219. frame_queue = mp.Queue()
  220. camera_fps = EventsPerSecond()
  221. camera_fps.start()
  222. camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, detection_frame, stop_event)
  223. camera_capture.start()
  224. camera_processes[name] = {
  225. 'camera_fps': camera_fps,
  226. 'take_frame': take_frame,
  227. 'process_fps': mp.Value('d', 0.0),
  228. 'detection_fps': mp.Value('d', 0.0),
  229. 'detection_frame': detection_frame,
  230. 'read_start': mp.Value('d', 0.0),
  231. 'ffmpeg_process': ffmpeg_process,
  232. 'ffmpeg_cmd': ffmpeg_cmd,
  233. 'frame_queue': frame_queue,
  234. 'frame_shape': frame_shape,
  235. 'capture_thread': camera_capture
  236. }
  237. # merge global object config into camera object config
  238. camera_objects_config = config.get('objects', {})
  239. # get objects to track for camera
  240. objects_to_track = camera_objects_config.get('track', GLOBAL_OBJECT_CONFIG.get('track', ['person']))
  241. # merge object filters
  242. global_object_filters = GLOBAL_OBJECT_CONFIG.get('filters', {})
  243. camera_object_filters = camera_objects_config.get('filters', {})
  244. objects_with_config = set().union(global_object_filters.keys(), camera_object_filters.keys())
  245. object_filters = {}
  246. for obj in objects_with_config:
  247. object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})}
  248. config['objects'] = {
  249. 'track': objects_to_track,
  250. 'filters': object_filters
  251. }
  252. camera_process = mp.Process(target=track_camera, args=(name, config, frame_queue, frame_shape,
  253. tflite_process.detection_queue, tracked_objects_queue, camera_processes[name]['process_fps'],
  254. camera_processes[name]['detection_fps'],
  255. camera_processes[name]['read_start'], camera_processes[name]['detection_frame'], stop_event))
  256. camera_process.daemon = True
  257. camera_processes[name]['process'] = camera_process
  258. for name, camera_process in camera_processes.items():
  259. camera_process['process'].start()
  260. print(f"Camera_process started for {name}: {camera_process['process'].pid}")
  261. event_processor = EventProcessor(CONFIG['cameras'], camera_processes, '/cache', '/clips', event_queue, stop_event)
  262. event_processor.start()
  263. object_processor = TrackedObjectProcessor(CONFIG['cameras'], CONFIG.get('zones', {}), client, MQTT_TOPIC_PREFIX, tracked_objects_queue, event_queue, stop_event)
  264. object_processor.start()
  265. camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, plasma_process, stop_event)
  266. camera_watchdog.start()
  267. def receiveSignal(signalNumber, frame):
  268. print('Received:', signalNumber)
  269. stop_event.set()
  270. event_processor.join()
  271. object_processor.join()
  272. camera_watchdog.join()
  273. for name, camera_process in camera_processes.items():
  274. camera_process['capture_thread'].join()
  275. rc = camera_watchdog.plasma_process.poll()
  276. if rc == None:
  277. camera_watchdog.plasma_process.terminate()
  278. sys.exit()
  279. signal.signal(signal.SIGTERM, receiveSignal)
  280. signal.signal(signal.SIGINT, receiveSignal)
  281. # create a flask app that encodes frames a mjpeg on demand
  282. app = Flask(__name__)
  283. log = logging.getLogger('werkzeug')
  284. log.setLevel(logging.ERROR)
  285. @app.route('/')
  286. def ishealthy():
  287. # return a healh
  288. return "Frigate is running. Alive and healthy!"
  289. @app.route('/debug/stack')
  290. def processor_stack():
  291. frame = sys._current_frames().get(object_processor.ident, None)
  292. if frame:
  293. return "<br>".join(traceback.format_stack(frame)), 200
  294. else:
  295. return "no frame found", 200
  296. @app.route('/debug/print_stack')
  297. def print_stack():
  298. pid = int(request.args.get('pid', 0))
  299. if pid == 0:
  300. return "missing pid", 200
  301. else:
  302. os.kill(pid, signal.SIGUSR1)
  303. return "check logs", 200
  304. @app.route('/debug/stats')
  305. def stats():
  306. stats = {}
  307. total_detection_fps = 0
  308. for name, camera_stats in camera_processes.items():
  309. total_detection_fps += camera_stats['detection_fps'].value
  310. capture_thread = camera_stats['capture_thread']
  311. stats[name] = {
  312. 'camera_fps': round(capture_thread.fps.eps(), 2),
  313. 'process_fps': round(camera_stats['process_fps'].value, 2),
  314. 'skipped_fps': round(capture_thread.skipped_fps.eps(), 2),
  315. 'detection_fps': round(camera_stats['detection_fps'].value, 2),
  316. 'read_start': camera_stats['read_start'].value,
  317. 'pid': camera_stats['process'].pid,
  318. 'ffmpeg_pid': camera_stats['ffmpeg_process'].pid,
  319. 'frame_info': {
  320. 'read': capture_thread.current_frame.value,
  321. 'detect': camera_stats['detection_frame'].value,
  322. 'process': object_processor.camera_data[name]['current_frame_time']
  323. }
  324. }
  325. stats['coral'] = {
  326. 'fps': round(total_detection_fps, 2),
  327. 'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2),
  328. 'detection_start': tflite_process.detection_start.value,
  329. 'pid': tflite_process.detect_process.pid
  330. }
  331. rc = camera_watchdog.plasma_process.poll()
  332. stats['plasma_store_rc'] = rc
  333. return jsonify(stats)
  334. @app.route('/<camera_name>/<label>/best.jpg')
  335. def best(camera_name, label):
  336. if camera_name in CONFIG['cameras']:
  337. best_frame = object_processor.get_best(camera_name, label)
  338. if best_frame is None:
  339. best_frame = np.zeros((720,1280,3), np.uint8)
  340. height = int(request.args.get('h', str(best_frame.shape[0])))
  341. width = int(height*best_frame.shape[1]/best_frame.shape[0])
  342. best_frame = cv2.resize(best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
  343. best_frame = cv2.cvtColor(best_frame, cv2.COLOR_RGB2BGR)
  344. ret, jpg = cv2.imencode('.jpg', best_frame)
  345. response = make_response(jpg.tobytes())
  346. response.headers['Content-Type'] = 'image/jpg'
  347. return response
  348. else:
  349. return "Camera named {} not found".format(camera_name), 404
  350. @app.route('/<camera_name>')
  351. def mjpeg_feed(camera_name):
  352. fps = int(request.args.get('fps', '3'))
  353. height = int(request.args.get('h', '360'))
  354. if camera_name in CONFIG['cameras']:
  355. # return a multipart response
  356. return Response(imagestream(camera_name, fps, height),
  357. mimetype='multipart/x-mixed-replace; boundary=frame')
  358. else:
  359. return "Camera named {} not found".format(camera_name), 404
  360. @app.route('/<camera_name>/latest.jpg')
  361. def latest_frame(camera_name):
  362. if camera_name in CONFIG['cameras']:
  363. # max out at specified FPS
  364. frame = object_processor.get_current_frame(camera_name)
  365. if frame is None:
  366. frame = np.zeros((720,1280,3), np.uint8)
  367. height = int(request.args.get('h', str(frame.shape[0])))
  368. width = int(height*frame.shape[1]/frame.shape[0])
  369. frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
  370. frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
  371. ret, jpg = cv2.imencode('.jpg', frame)
  372. response = make_response(jpg.tobytes())
  373. response.headers['Content-Type'] = 'image/jpg'
  374. return response
  375. else:
  376. return "Camera named {} not found".format(camera_name), 404
  377. def imagestream(camera_name, fps, height):
  378. while True:
  379. # max out at specified FPS
  380. time.sleep(1/fps)
  381. frame = object_processor.get_current_frame(camera_name)
  382. if frame is None:
  383. frame = np.zeros((height,int(height*16/9),3), np.uint8)
  384. width = int(height*frame.shape[1]/frame.shape[0])
  385. frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_LINEAR)
  386. frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
  387. ret, jpg = cv2.imencode('.jpg', frame)
  388. yield (b'--frame\r\n'
  389. b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
  390. app.run(host='0.0.0.0', port=WEB_PORT, debug=False)
  391. object_processor.join()
  392. plasma_process.terminate()
  393. if __name__ == '__main__':
  394. main()