detect_objects.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. import os
  2. import sys
  3. import traceback
  4. import signal
  5. import cv2
  6. import time
  7. import datetime
  8. import queue
  9. import yaml
  10. import threading
  11. import multiprocessing as mp
  12. import subprocess as sp
  13. import numpy as np
  14. import logging
  15. from flask import Flask, Response, make_response, jsonify, request
  16. import paho.mqtt.client as mqtt
  17. from frigate.video import track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg
  18. from frigate.object_processing import TrackedObjectProcessor
  19. from frigate.events import EventProcessor
  20. from frigate.util import EventsPerSecond
  21. from frigate.edgetpu import EdgeTPUProcess
  22. FRIGATE_VARS = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
  23. with open('/config/config.yml') as f:
  24. CONFIG = yaml.safe_load(f)
  25. MQTT_HOST = CONFIG['mqtt']['host']
  26. MQTT_PORT = CONFIG.get('mqtt', {}).get('port', 1883)
  27. MQTT_TOPIC_PREFIX = CONFIG.get('mqtt', {}).get('topic_prefix', 'frigate')
  28. MQTT_USER = CONFIG.get('mqtt', {}).get('user')
  29. MQTT_PASS = CONFIG.get('mqtt', {}).get('password')
  30. if not MQTT_PASS is None:
  31. MQTT_PASS = MQTT_PASS.format(**FRIGATE_VARS)
  32. MQTT_CLIENT_ID = CONFIG.get('mqtt', {}).get('client_id', 'frigate')
  33. # Set the default FFmpeg config
  34. FFMPEG_CONFIG = CONFIG.get('ffmpeg', {})
  35. FFMPEG_DEFAULT_CONFIG = {
  36. 'global_args': FFMPEG_CONFIG.get('global_args',
  37. ['-hide_banner','-loglevel','panic']),
  38. 'hwaccel_args': FFMPEG_CONFIG.get('hwaccel_args',
  39. []),
  40. 'input_args': FFMPEG_CONFIG.get('input_args',
  41. ['-avoid_negative_ts', 'make_zero',
  42. '-fflags', 'nobuffer',
  43. '-flags', 'low_delay',
  44. '-strict', 'experimental',
  45. '-fflags', '+genpts+discardcorrupt',
  46. '-rtsp_transport', 'tcp',
  47. '-stimeout', '5000000',
  48. '-use_wallclock_as_timestamps', '1']),
  49. 'output_args': FFMPEG_CONFIG.get('output_args',
  50. ['-f', 'rawvideo',
  51. '-pix_fmt', 'rgb24'])
  52. }
  53. GLOBAL_OBJECT_CONFIG = CONFIG.get('objects', {})
  54. WEB_PORT = CONFIG.get('web_port', 5000)
  55. DEBUG = (CONFIG.get('debug', '0') == '1')
  56. def start_plasma_store():
  57. plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma']
  58. plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
  59. time.sleep(1)
  60. rc = plasma_process.poll()
  61. if rc is not None:
  62. return None
  63. return plasma_process
  64. class CameraWatchdog(threading.Thread):
  65. def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, plasma_process):
  66. threading.Thread.__init__(self)
  67. self.camera_processes = camera_processes
  68. self.config = config
  69. self.tflite_process = tflite_process
  70. self.tracked_objects_queue = tracked_objects_queue
  71. self.plasma_process = plasma_process
  72. def run(self):
  73. time.sleep(10)
  74. while True:
  75. # wait a bit before checking
  76. time.sleep(10)
  77. now = datetime.datetime.now().timestamp()
  78. # check the plasma process
  79. rc = self.plasma_process.poll()
  80. if rc != None:
  81. print(f"plasma_process exited unexpectedly with {rc}")
  82. self.plasma_process = start_plasma_store()
  83. # check the detection process
  84. detection_start = self.tflite_process.detection_start.value
  85. if (detection_start > 0.0 and
  86. now - detection_start > 10):
  87. print("Detection appears to be stuck. Restarting detection process")
  88. self.tflite_process.start_or_restart()
  89. elif not self.tflite_process.detect_process.is_alive():
  90. print("Detection appears to have stopped. Restarting detection process")
  91. self.tflite_process.start_or_restart()
  92. # check the camera processes
  93. for name, camera_process in self.camera_processes.items():
  94. process = camera_process['process']
  95. if not process.is_alive():
  96. print(f"Track process for {name} is not alive. Starting again...")
  97. camera_process['process_fps'].value = 0.0
  98. camera_process['detection_fps'].value = 0.0
  99. camera_process['read_start'].value = 0.0
  100. process = mp.Process(target=track_camera, args=(name, self.config[name], GLOBAL_OBJECT_CONFIG, camera_process['frame_queue'],
  101. camera_process['frame_shape'], self.tflite_process.detection_queue, self.tracked_objects_queue,
  102. camera_process['process_fps'], camera_process['detection_fps'],
  103. camera_process['read_start'], camera_process['detection_frame']))
  104. process.daemon = True
  105. camera_process['process'] = process
  106. process.start()
  107. print(f"Track process started for {name}: {process.pid}")
  108. if not camera_process['capture_thread'].is_alive():
  109. frame_shape = camera_process['frame_shape']
  110. frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
  111. ffmpeg_process = start_or_restart_ffmpeg(camera_process['ffmpeg_cmd'], frame_size)
  112. camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, camera_process['frame_queue'],
  113. camera_process['take_frame'], camera_process['camera_fps'], camera_process['detection_frame'])
  114. camera_capture.start()
  115. camera_process['ffmpeg_process'] = ffmpeg_process
  116. camera_process['capture_thread'] = camera_capture
  117. elif now - camera_process['capture_thread'].current_frame > 5:
  118. print(f"No frames received from {name} in 5 seconds. Exiting ffmpeg...")
  119. ffmpeg_process = camera_process['ffmpeg_process']
  120. ffmpeg_process.terminate()
  121. try:
  122. print("Waiting for ffmpeg to exit gracefully...")
  123. ffmpeg_process.communicate(timeout=30)
  124. except sp.TimeoutExpired:
  125. print("FFmpeg didnt exit. Force killing...")
  126. ffmpeg_process.kill()
  127. ffmpeg_process.communicate()
  128. def main():
  129. # connect to mqtt and setup last will
  130. def on_connect(client, userdata, flags, rc):
  131. print("On connect called")
  132. if rc != 0:
  133. if rc == 3:
  134. print ("MQTT Server unavailable")
  135. elif rc == 4:
  136. print ("MQTT Bad username or password")
  137. elif rc == 5:
  138. print ("MQTT Not authorized")
  139. else:
  140. print ("Unable to connect to MQTT: Connection refused. Error code: " + str(rc))
  141. # publish a message to signal that the service is running
  142. client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
  143. client = mqtt.Client(client_id=MQTT_CLIENT_ID)
  144. client.on_connect = on_connect
  145. client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
  146. if not MQTT_USER is None:
  147. client.username_pw_set(MQTT_USER, password=MQTT_PASS)
  148. client.connect(MQTT_HOST, MQTT_PORT, 60)
  149. client.loop_start()
  150. plasma_process = start_plasma_store()
  151. ##
  152. # Setup config defaults for cameras
  153. ##
  154. for name, config in CONFIG['cameras'].items():
  155. config['snapshots'] = {
  156. 'show_timestamp': config.get('snapshots', {}).get('show_timestamp', True)
  157. }
  158. # Queue for cameras to push tracked objects to
  159. tracked_objects_queue = mp.SimpleQueue()
  160. # Queue for clip processing
  161. event_queue = mp.Queue()
  162. # Start the shared tflite process
  163. tflite_process = EdgeTPUProcess()
  164. # start the camera processes
  165. camera_processes = {}
  166. for name, config in CONFIG['cameras'].items():
  167. # Merge the ffmpeg config with the global config
  168. ffmpeg = config.get('ffmpeg', {})
  169. ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
  170. ffmpeg_global_args = ffmpeg.get('global_args', FFMPEG_DEFAULT_CONFIG['global_args'])
  171. ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', FFMPEG_DEFAULT_CONFIG['hwaccel_args'])
  172. ffmpeg_input_args = ffmpeg.get('input_args', FFMPEG_DEFAULT_CONFIG['input_args'])
  173. ffmpeg_output_args = ffmpeg.get('output_args', FFMPEG_DEFAULT_CONFIG['output_args'])
  174. if config.get('save_clips', False):
  175. ffmpeg_output_args = [
  176. "-f",
  177. "segment",
  178. "-segment_time",
  179. "10",
  180. "-segment_format",
  181. "mp4",
  182. "-reset_timestamps",
  183. "1",
  184. "-strftime",
  185. "1",
  186. "-c",
  187. "copy",
  188. "-an",
  189. "-map",
  190. "0",
  191. f"/cache/{name}-%Y%m%d%H%M%S.mp4"
  192. ] + ffmpeg_output_args
  193. ffmpeg_cmd = (['ffmpeg'] +
  194. ffmpeg_global_args +
  195. ffmpeg_hwaccel_args +
  196. ffmpeg_input_args +
  197. ['-i', ffmpeg_input] +
  198. ffmpeg_output_args +
  199. ['pipe:'])
  200. if 'width' in config and 'height' in config:
  201. frame_shape = (config['height'], config['width'], 3)
  202. else:
  203. frame_shape = get_frame_shape(ffmpeg_input)
  204. frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
  205. take_frame = config.get('take_frame', 1)
  206. detection_frame = mp.Value('d', 0.0)
  207. ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
  208. frame_queue = mp.SimpleQueue()
  209. camera_fps = EventsPerSecond()
  210. camera_fps.start()
  211. camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, detection_frame)
  212. camera_capture.start()
  213. camera_processes[name] = {
  214. 'camera_fps': camera_fps,
  215. 'take_frame': take_frame,
  216. 'process_fps': mp.Value('d', 0.0),
  217. 'detection_fps': mp.Value('d', 0.0),
  218. 'detection_frame': detection_frame,
  219. 'read_start': mp.Value('d', 0.0),
  220. 'ffmpeg_process': ffmpeg_process,
  221. 'ffmpeg_cmd': ffmpeg_cmd,
  222. 'frame_queue': frame_queue,
  223. 'frame_shape': frame_shape,
  224. 'capture_thread': camera_capture
  225. }
  226. camera_process = mp.Process(target=track_camera, args=(name, config, GLOBAL_OBJECT_CONFIG, frame_queue, frame_shape,
  227. tflite_process.detection_queue, tracked_objects_queue, camera_processes[name]['process_fps'],
  228. camera_processes[name]['detection_fps'],
  229. camera_processes[name]['read_start'], camera_processes[name]['detection_frame']))
  230. camera_process.daemon = True
  231. camera_processes[name]['process'] = camera_process
  232. for name, camera_process in camera_processes.items():
  233. camera_process['process'].start()
  234. print(f"Camera_process started for {name}: {camera_process['process'].pid}")
  235. event_processor = EventProcessor(CONFIG['cameras'], camera_processes, '/cache', '/clips', event_queue)
  236. event_processor.start()
  237. object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue, event_queue)
  238. object_processor.start()
  239. camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, plasma_process)
  240. camera_watchdog.start()
  241. # create a flask app that encodes frames a mjpeg on demand
  242. app = Flask(__name__)
  243. log = logging.getLogger('werkzeug')
  244. log.setLevel(logging.ERROR)
  245. @app.route('/')
  246. def ishealthy():
  247. # return a healh
  248. return "Frigate is running. Alive and healthy!"
  249. @app.route('/debug/stack')
  250. def processor_stack():
  251. frame = sys._current_frames().get(object_processor.ident, None)
  252. if frame:
  253. return "<br>".join(traceback.format_stack(frame)), 200
  254. else:
  255. return "no frame found", 200
  256. @app.route('/debug/print_stack')
  257. def print_stack():
  258. pid = int(request.args.get('pid', 0))
  259. if pid == 0:
  260. return "missing pid", 200
  261. else:
  262. os.kill(pid, signal.SIGUSR1)
  263. return "check logs", 200
  264. @app.route('/debug/stats')
  265. def stats():
  266. stats = {}
  267. total_detection_fps = 0
  268. for name, camera_stats in camera_processes.items():
  269. total_detection_fps += camera_stats['detection_fps'].value
  270. capture_thread = camera_stats['capture_thread']
  271. stats[name] = {
  272. 'camera_fps': round(capture_thread.fps.eps(), 2),
  273. 'process_fps': round(camera_stats['process_fps'].value, 2),
  274. 'skipped_fps': round(capture_thread.skipped_fps.eps(), 2),
  275. 'detection_fps': round(camera_stats['detection_fps'].value, 2),
  276. 'read_start': camera_stats['read_start'].value,
  277. 'pid': camera_stats['process'].pid,
  278. 'ffmpeg_pid': camera_stats['ffmpeg_process'].pid,
  279. 'frame_info': {
  280. 'read': capture_thread.current_frame,
  281. 'detect': camera_stats['detection_frame'].value,
  282. 'process': object_processor.camera_data[name]['current_frame_time']
  283. }
  284. }
  285. stats['coral'] = {
  286. 'fps': round(total_detection_fps, 2),
  287. 'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2),
  288. 'detection_start': tflite_process.detection_start.value,
  289. 'pid': tflite_process.detect_process.pid
  290. }
  291. rc = camera_watchdog.plasma_process.poll()
  292. stats['plasma_store_rc'] = rc
  293. return jsonify(stats)
  294. @app.route('/<camera_name>/<label>/best.jpg')
  295. def best(camera_name, label):
  296. if camera_name in CONFIG['cameras']:
  297. best_frame = object_processor.get_best(camera_name, label)
  298. if best_frame is None:
  299. best_frame = np.zeros((720,1280,3), np.uint8)
  300. best_frame = cv2.cvtColor(best_frame, cv2.COLOR_RGB2BGR)
  301. ret, jpg = cv2.imencode('.jpg', best_frame)
  302. response = make_response(jpg.tobytes())
  303. response.headers['Content-Type'] = 'image/jpg'
  304. return response
  305. else:
  306. return "Camera named {} not found".format(camera_name), 404
  307. @app.route('/<camera_name>')
  308. def mjpeg_feed(camera_name):
  309. fps = int(request.args.get('fps', '3'))
  310. height = int(request.args.get('h', '360'))
  311. if camera_name in CONFIG['cameras']:
  312. # return a multipart response
  313. return Response(imagestream(camera_name, fps, height),
  314. mimetype='multipart/x-mixed-replace; boundary=frame')
  315. else:
  316. return "Camera named {} not found".format(camera_name), 404
  317. def imagestream(camera_name, fps, height):
  318. while True:
  319. # max out at specified FPS
  320. time.sleep(1/fps)
  321. frame = object_processor.get_current_frame(camera_name)
  322. if frame is None:
  323. frame = np.zeros((height,int(height*16/9),3), np.uint8)
  324. width = int(height*frame.shape[1]/frame.shape[0])
  325. frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_LINEAR)
  326. frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
  327. ret, jpg = cv2.imencode('.jpg', frame)
  328. yield (b'--frame\r\n'
  329. b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
  330. app.run(host='0.0.0.0', port=WEB_PORT, debug=False)
  331. object_processor.join()
  332. plasma_process.terminate()
  333. if __name__ == '__main__':
  334. main()