123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438 |
- import os
- import signal
- import sys
- import traceback
- import signal
- import cv2
- import time
- import datetime
- import queue
- import yaml
- import threading
- import multiprocessing as mp
- import subprocess as sp
- import numpy as np
- import logging
- from flask import Flask, Response, make_response, jsonify, request
- import paho.mqtt.client as mqtt
- from frigate.video import track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg
- from frigate.object_processing import TrackedObjectProcessor
- from frigate.events import EventProcessor
- from frigate.util import EventsPerSecond
- from frigate.edgetpu import EdgeTPUProcess
- FRIGATE_VARS = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
- with open('/config/config.yml') as f:
- CONFIG = yaml.safe_load(f)
- MQTT_HOST = CONFIG['mqtt']['host']
- MQTT_PORT = CONFIG.get('mqtt', {}).get('port', 1883)
- MQTT_TOPIC_PREFIX = CONFIG.get('mqtt', {}).get('topic_prefix', 'frigate')
- MQTT_USER = CONFIG.get('mqtt', {}).get('user')
- MQTT_PASS = CONFIG.get('mqtt', {}).get('password')
- if not MQTT_PASS is None:
- MQTT_PASS = MQTT_PASS.format(**FRIGATE_VARS)
- MQTT_CLIENT_ID = CONFIG.get('mqtt', {}).get('client_id', 'frigate')
- # Set the default FFmpeg config
- FFMPEG_CONFIG = CONFIG.get('ffmpeg', {})
- FFMPEG_DEFAULT_CONFIG = {
- 'global_args': FFMPEG_CONFIG.get('global_args',
- ['-hide_banner','-loglevel','panic']),
- 'hwaccel_args': FFMPEG_CONFIG.get('hwaccel_args',
- []),
- 'input_args': FFMPEG_CONFIG.get('input_args',
- ['-avoid_negative_ts', 'make_zero',
- '-fflags', 'nobuffer',
- '-flags', 'low_delay',
- '-strict', 'experimental',
- '-fflags', '+genpts+discardcorrupt',
- '-rtsp_transport', 'tcp',
- '-stimeout', '5000000',
- '-use_wallclock_as_timestamps', '1']),
- 'output_args': FFMPEG_CONFIG.get('output_args',
- ['-f', 'rawvideo',
- '-pix_fmt', 'rgb24'])
- }
- GLOBAL_OBJECT_CONFIG = CONFIG.get('objects', {})
- WEB_PORT = CONFIG.get('web_port', 5000)
- DEBUG = (CONFIG.get('debug', '0') == '1')
- TENSORFLOW_DEVICE = CONFIG.get('tensorflow_device')
- class CameraWatchdog(threading.Thread):
- def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, stop_event):
- threading.Thread.__init__(self)
- self.camera_processes = camera_processes
- self.config = config
- self.tflite_process = tflite_process
- self.tracked_objects_queue = tracked_objects_queue
- self.stop_event = stop_event
- def run(self):
- time.sleep(10)
- while True:
- # wait a bit before checking
- time.sleep(10)
- if self.stop_event.is_set():
- print(f"Exiting watchdog...")
- break
- now = datetime.datetime.now().timestamp()
- # check the detection process
- detection_start = self.tflite_process.detection_start.value
- if (detection_start > 0.0 and
- now - detection_start > 10):
- print("Detection appears to be stuck. Restarting detection process")
- self.tflite_process.start_or_restart()
- elif not self.tflite_process.detect_process.is_alive():
- print("Detection appears to have stopped. Restarting detection process")
- self.tflite_process.start_or_restart()
- # check the camera processes
- for name, camera_process in self.camera_processes.items():
- process = camera_process['process']
- if not process.is_alive():
- print(f"Track process for {name} is not alive. Starting again...")
- camera_process['process_fps'].value = 0.0
- camera_process['detection_fps'].value = 0.0
- camera_process['read_start'].value = 0.0
- process = mp.Process(target=track_camera, args=(name, self.config[name], camera_process['frame_queue'],
- camera_process['frame_shape'], self.tflite_process.detection_queue, self.tracked_objects_queue,
- camera_process['process_fps'], camera_process['detection_fps'],
- camera_process['read_start'], camera_process['detection_frame'], self.stop_event))
- process.daemon = True
- camera_process['process'] = process
- process.start()
- print(f"Track process started for {name}: {process.pid}")
-
- if not camera_process['capture_thread'].is_alive():
- frame_shape = camera_process['frame_shape']
- frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
- ffmpeg_process = start_or_restart_ffmpeg(camera_process['ffmpeg_cmd'], frame_size)
- camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, camera_process['frame_queue'],
- camera_process['take_frame'], camera_process['camera_fps'], camera_process['detection_frame'], self.stop_event)
- camera_capture.start()
- camera_process['ffmpeg_process'] = ffmpeg_process
- camera_process['capture_thread'] = camera_capture
- elif now - camera_process['capture_thread'].current_frame.value > 5:
- print(f"No frames received from {name} in 5 seconds. Exiting ffmpeg...")
- ffmpeg_process = camera_process['ffmpeg_process']
- ffmpeg_process.terminate()
- try:
- print("Waiting for ffmpeg to exit gracefully...")
- ffmpeg_process.communicate(timeout=30)
- except sp.TimeoutExpired:
- print("FFmpeg didnt exit. Force killing...")
- ffmpeg_process.kill()
- ffmpeg_process.communicate()
- def main():
- stop_event = threading.Event()
- # connect to mqtt and setup last will
- def on_connect(client, userdata, flags, rc):
- print("On connect called")
- if rc != 0:
- if rc == 3:
- print ("MQTT Server unavailable")
- elif rc == 4:
- print ("MQTT Bad username or password")
- elif rc == 5:
- print ("MQTT Not authorized")
- else:
- print ("Unable to connect to MQTT: Connection refused. Error code: " + str(rc))
- # publish a message to signal that the service is running
- client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
- client = mqtt.Client(client_id=MQTT_CLIENT_ID)
- client.on_connect = on_connect
- client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
- if not MQTT_USER is None:
- client.username_pw_set(MQTT_USER, password=MQTT_PASS)
- client.connect(MQTT_HOST, MQTT_PORT, 60)
- client.loop_start()
- ##
- # Setup config defaults for cameras
- ##
- for name, config in CONFIG['cameras'].items():
- config['snapshots'] = {
- 'show_timestamp': config.get('snapshots', {}).get('show_timestamp', True),
- 'draw_zones': config.get('snapshots', {}).get('draw_zones', False)
- }
- config['zones'] = config.get('zones', {})
- # Queue for cameras to push tracked objects to
- tracked_objects_queue = mp.Queue()
- # Queue for clip processing
- event_queue = mp.Queue()
- # create the detection pipes
- detection_pipes = {}
- for name in CONFIG['cameras'].keys():
- detection_pipes[name] = mp.Pipe(duplex=False)
-
- # Start the shared tflite process
- tflite_process = EdgeTPUProcess(result_connections={ key:value[1] for (key,value) in detection_pipes.items() }, tf_device=TENSORFLOW_DEVICE)
- # create the camera processes
- camera_processes = {}
- for name, config in CONFIG['cameras'].items():
- # Merge the ffmpeg config with the global config
- ffmpeg = config.get('ffmpeg', {})
- ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
- ffmpeg_global_args = ffmpeg.get('global_args', FFMPEG_DEFAULT_CONFIG['global_args'])
- ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', FFMPEG_DEFAULT_CONFIG['hwaccel_args'])
- ffmpeg_input_args = ffmpeg.get('input_args', FFMPEG_DEFAULT_CONFIG['input_args'])
- ffmpeg_output_args = ffmpeg.get('output_args', FFMPEG_DEFAULT_CONFIG['output_args'])
- if not config.get('fps') is None:
- ffmpeg_output_args = ["-r", str(config.get('fps'))] + ffmpeg_output_args
- if config.get('save_clips', {}).get('enabled', False):
- ffmpeg_output_args = [
- "-f",
- "segment",
- "-segment_time",
- "10",
- "-segment_format",
- "mp4",
- "-reset_timestamps",
- "1",
- "-strftime",
- "1",
- "-c",
- "copy",
- "-an",
- "-map",
- "0",
- f"/cache/{name}-%Y%m%d%H%M%S.mp4"
- ] + ffmpeg_output_args
- ffmpeg_cmd = (['ffmpeg'] +
- ffmpeg_global_args +
- ffmpeg_hwaccel_args +
- ffmpeg_input_args +
- ['-i', ffmpeg_input] +
- ffmpeg_output_args +
- ['pipe:'])
-
- if 'width' in config and 'height' in config:
- frame_shape = (config['height'], config['width'], 3)
- else:
- frame_shape = get_frame_shape(ffmpeg_input)
-
- config['frame_shape'] = frame_shape
- frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
- take_frame = config.get('take_frame', 1)
- detection_frame = mp.Value('d', 0.0)
- ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
- frame_queue = mp.Queue()
- camera_fps = EventsPerSecond()
- camera_fps.start()
- camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, detection_frame, stop_event)
- camera_capture.start()
- camera_processes[name] = {
- 'camera_fps': camera_fps,
- 'take_frame': take_frame,
- 'process_fps': mp.Value('d', 0.0),
- 'detection_fps': mp.Value('d', 0.0),
- 'detection_frame': detection_frame,
- 'read_start': mp.Value('d', 0.0),
- 'ffmpeg_process': ffmpeg_process,
- 'ffmpeg_cmd': ffmpeg_cmd,
- 'frame_queue': frame_queue,
- 'frame_shape': frame_shape,
- 'capture_thread': camera_capture
- }
- # merge global object config into camera object config
- camera_objects_config = config.get('objects', {})
- # get objects to track for camera
- objects_to_track = camera_objects_config.get('track', GLOBAL_OBJECT_CONFIG.get('track', ['person']))
- # get object filters
- object_filters = camera_objects_config.get('filters', GLOBAL_OBJECT_CONFIG.get('filters', {}))
- config['objects'] = {
- 'track': objects_to_track,
- 'filters': object_filters
- }
- camera_process = mp.Process(target=track_camera, args=(name, config, frame_queue, frame_shape,
- tflite_process.detection_queue, detection_pipes[name][0], tracked_objects_queue, camera_processes[name]['process_fps'],
- camera_processes[name]['detection_fps'],
- camera_processes[name]['read_start'], camera_processes[name]['detection_frame'], stop_event))
- camera_process.daemon = True
- camera_processes[name]['process'] = camera_process
- # start the camera_processes
- for name, camera_process in camera_processes.items():
- camera_process['process'].start()
- print(f"Camera_process started for {name}: {camera_process['process'].pid}")
- event_processor = EventProcessor(CONFIG, camera_processes, '/cache', '/clips', event_queue, stop_event)
- event_processor.start()
-
- object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue, event_queue, stop_event)
- object_processor.start()
-
- camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, stop_event)
- camera_watchdog.start()
- def receiveSignal(signalNumber, frame):
- print('Received:', signalNumber)
- stop_event.set()
- event_processor.join()
- object_processor.join()
- camera_watchdog.join()
- for camera_process in camera_processes.values():
- camera_process['capture_thread'].join()
- tflite_process.stop()
- sys.exit()
-
- signal.signal(signal.SIGTERM, receiveSignal)
- signal.signal(signal.SIGINT, receiveSignal)
- # create a flask app that encodes frames a mjpeg on demand
- app = Flask(__name__)
- log = logging.getLogger('werkzeug')
- log.setLevel(logging.ERROR)
- @app.route('/')
- def ishealthy():
- # return a healh
- return "Frigate is running. Alive and healthy!"
- @app.route('/debug/stack')
- def processor_stack():
- frame = sys._current_frames().get(object_processor.ident, None)
- if frame:
- return "<br>".join(traceback.format_stack(frame)), 200
- else:
- return "no frame found", 200
- @app.route('/debug/print_stack')
- def print_stack():
- pid = int(request.args.get('pid', 0))
- if pid == 0:
- return "missing pid", 200
- else:
- os.kill(pid, signal.SIGUSR1)
- return "check logs", 200
- @app.route('/debug/stats')
- def stats():
- stats = {}
- total_detection_fps = 0
- for name, camera_stats in camera_processes.items():
- total_detection_fps += camera_stats['detection_fps'].value
- capture_thread = camera_stats['capture_thread']
- stats[name] = {
- 'camera_fps': round(capture_thread.fps.eps(), 2),
- 'process_fps': round(camera_stats['process_fps'].value, 2),
- 'skipped_fps': round(capture_thread.skipped_fps.eps(), 2),
- 'detection_fps': round(camera_stats['detection_fps'].value, 2),
- 'read_start': camera_stats['read_start'].value,
- 'pid': camera_stats['process'].pid,
- 'ffmpeg_pid': camera_stats['ffmpeg_process'].pid,
- 'frame_info': {
- 'read': capture_thread.current_frame.value,
- 'detect': camera_stats['detection_frame'].value,
- 'process': object_processor.camera_data[name]['current_frame_time']
- }
- }
-
- stats['coral'] = {
- 'fps': round(total_detection_fps, 2),
- 'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2),
- 'detection_start': tflite_process.detection_start.value,
- 'pid': tflite_process.detect_process.pid
- }
- return jsonify(stats)
- @app.route('/<camera_name>/<label>/best.jpg')
- def best(camera_name, label):
- if camera_name in CONFIG['cameras']:
- best_object = object_processor.get_best(camera_name, label)
- best_frame = best_object.get('frame', np.zeros((720,1280,3), np.uint8))
-
- crop = bool(request.args.get('crop', 0, type=int))
- if crop:
- region = best_object.get('region', [0,0,300,300])
- best_frame = best_frame[region[1]:region[3], region[0]:region[2]]
-
- height = int(request.args.get('h', str(best_frame.shape[0])))
- width = int(height*best_frame.shape[1]/best_frame.shape[0])
- best_frame = cv2.resize(best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
- best_frame = cv2.cvtColor(best_frame, cv2.COLOR_RGB2BGR)
- ret, jpg = cv2.imencode('.jpg', best_frame)
- response = make_response(jpg.tobytes())
- response.headers['Content-Type'] = 'image/jpg'
- return response
- else:
- return "Camera named {} not found".format(camera_name), 404
- @app.route('/<camera_name>')
- def mjpeg_feed(camera_name):
- fps = int(request.args.get('fps', '3'))
- height = int(request.args.get('h', '360'))
- if camera_name in CONFIG['cameras']:
- # return a multipart response
- return Response(imagestream(camera_name, fps, height),
- mimetype='multipart/x-mixed-replace; boundary=frame')
- else:
- return "Camera named {} not found".format(camera_name), 404
-
- @app.route('/<camera_name>/latest.jpg')
- def latest_frame(camera_name):
- if camera_name in CONFIG['cameras']:
- # max out at specified FPS
- frame = object_processor.get_current_frame(camera_name)
- if frame is None:
- frame = np.zeros((720,1280,3), np.uint8)
- height = int(request.args.get('h', str(frame.shape[0])))
- width = int(height*frame.shape[1]/frame.shape[0])
- frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
- frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
- ret, jpg = cv2.imencode('.jpg', frame)
- response = make_response(jpg.tobytes())
- response.headers['Content-Type'] = 'image/jpg'
- return response
- else:
- return "Camera named {} not found".format(camera_name), 404
-
- def imagestream(camera_name, fps, height):
- while True:
- # max out at specified FPS
- time.sleep(1/fps)
- frame = object_processor.get_current_frame(camera_name)
- if frame is None:
- frame = np.zeros((height,int(height*16/9),3), np.uint8)
- width = int(height*frame.shape[1]/frame.shape[0])
- frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_LINEAR)
- frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
- ret, jpg = cv2.imencode('.jpg', frame)
- yield (b'--frame\r\n'
- b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
- app.run(host='0.0.0.0', port=WEB_PORT, debug=False)
- object_processor.join()
- if __name__ == '__main__':
- main()
|