123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- import os
- import cv2
- import imutils
- import time
- import datetime
- import ctypes
- import logging
- import multiprocessing as mp
- import queue
- import threading
- import json
- from contextlib import closing
- import numpy as np
- from object_detection.utils import visualization_utils as vis_util
- from flask import Flask, Response, make_response, send_file
- import paho.mqtt.client as mqtt
- from frigate.util import tonumpyarray
- from frigate.mqtt import MqttMotionPublisher, MqttObjectPublisher
- from frigate.objects import ObjectParser, ObjectCleaner, BestPersonFrame
- from frigate.motion import detect_motion
- from frigate.video import fetch_frames, FrameTracker
- from frigate.object_detection import FramePrepper, PreppedQueueProcessor, detect_objects
- RTSP_URL = os.getenv('RTSP_URL')
- MQTT_HOST = os.getenv('MQTT_HOST')
- MQTT_USER = os.getenv('MQTT_USER')
- MQTT_PASS = os.getenv('MQTT_PASS')
- MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX')
- # REGIONS = "300,0,0,2000,200,no-mask-300.bmp:300,300,0,2000,200,no-mask-300.bmp:300,600,0,2000,200,no-mask-300.bmp:300,900,0,2000,200,no-mask-300.bmp:300,0,300,2000,200,no-mask-300.bmp:300,300,300,2000,200,no-mask-300.bmp:300,600,300,2000,200,no-mask-300.bmp:300,900,300,2000,200,no-mask-300.bmp"
- # REGIONS = "400,350,250,50"
- REGIONS = os.getenv('REGIONS')
- DEBUG = (os.getenv('DEBUG') == '1')
- def main():
- DETECTED_OBJECTS = []
- recent_frames = {}
- # Parse selected regions
- regions = []
- for region_string in REGIONS.split(':'):
- region_parts = region_string.split(',')
- regions.append({
- 'size': int(region_parts[0]),
- 'x_offset': int(region_parts[1]),
- 'y_offset': int(region_parts[2]),
- 'min_person_area': int(region_parts[3]),
- # array for prepped frame with shape (1, 300, 300, 3)
- 'prepped_frame_array': mp.Array(ctypes.c_uint8, 300*300*3),
- # shared value for storing the prepped_frame_time
- 'prepped_frame_time': mp.Value('d', 0.0),
- # Lock to control access to the prepped frame
- 'prepped_frame_lock': mp.Lock()
- })
- # capture a single frame and check the frame shape so the correct array
- # size can be allocated in memory
- video = cv2.VideoCapture(RTSP_URL)
- ret, frame = video.read()
- if ret:
- frame_shape = frame.shape
- else:
- print("Unable to capture video stream")
- exit(1)
- video.release()
- # compute the flattened array length from the array shape
- flat_array_length = frame_shape[0] * frame_shape[1] * frame_shape[2]
- # create shared array for storing the full frame image data
- shared_arr = mp.Array(ctypes.c_uint8, flat_array_length)
- # create shared value for storing the frame_time
- shared_frame_time = mp.Value('d', 0.0)
- # Lock to control access to the frame
- frame_lock = mp.Lock()
- # Condition for notifying that a new frame is ready
- frame_ready = mp.Condition()
- # Condition for notifying that objects were parsed
- objects_parsed = mp.Condition()
- # Queue for detected objects
- object_queue = queue.Queue()
- # Queue for prepped frames
- prepped_frame_queue = queue.Queue(len(regions)*2)
- # shape current frame so it can be treated as an image
- frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
- # start the process to capture frames from the RTSP stream and store in a shared array
- capture_process = mp.Process(target=fetch_frames, args=(shared_arr,
- shared_frame_time, frame_lock, frame_ready, frame_shape, RTSP_URL))
- capture_process.daemon = True
- # for each region, start a separate thread to resize the region and prep for detection
- detection_prep_threads = []
- for region in regions:
- detection_prep_threads.append(FramePrepper(
- frame_arr,
- shared_frame_time,
- frame_ready,
- frame_lock,
- region['size'], region['x_offset'], region['y_offset'],
- prepped_frame_queue
- ))
- prepped_queue_processor = PreppedQueueProcessor(
- prepped_frame_queue,
- object_queue
- )
- prepped_queue_processor.start()
- # start a thread to store recent motion frames for processing
- frame_tracker = FrameTracker(frame_arr, shared_frame_time, frame_ready, frame_lock,
- recent_frames)
- frame_tracker.start()
- # start a thread to store the highest scoring recent person frame
- best_person_frame = BestPersonFrame(objects_parsed, recent_frames, DETECTED_OBJECTS)
- best_person_frame.start()
- # start a thread to parse objects from the queue
- object_parser = ObjectParser(object_queue, objects_parsed, DETECTED_OBJECTS, regions)
- object_parser.start()
- # start a thread to expire objects from the detected objects list
- object_cleaner = ObjectCleaner(objects_parsed, DETECTED_OBJECTS)
- object_cleaner.start()
- # connect to mqtt and setup last will
- def on_connect(client, userdata, flags, rc):
- print("On connect called")
- # publish a message to signal that the service is running
- client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
- client = mqtt.Client()
- client.on_connect = on_connect
- client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
- if not MQTT_USER is None:
- client.username_pw_set(MQTT_USER, password=MQTT_PASS)
- client.connect(MQTT_HOST, 1883, 60)
- client.loop_start()
- # start a thread to publish object scores (currently only person)
- mqtt_publisher = MqttObjectPublisher(client, MQTT_TOPIC_PREFIX, objects_parsed, DETECTED_OBJECTS)
- mqtt_publisher.start()
- # start the process of capturing frames
- capture_process.start()
- print("capture_process pid ", capture_process.pid)
- # start the object detection prep threads
- for detection_prep_thread in detection_prep_threads:
- detection_prep_thread.start()
- # create a flask app that encodes frames a mjpeg on demand
- app = Flask(__name__)
- @app.route('/best_person.jpg')
- def best_person():
- frame = np.zeros(frame_shape, np.uint8) if best_person_frame.best_frame is None else best_person_frame.best_frame
- ret, jpg = cv2.imencode('.jpg', frame)
- response = make_response(jpg.tobytes())
- response.headers['Content-Type'] = 'image/jpg'
- return response
- @app.route('/')
- def index():
- # return a multipart response
- return Response(imagestream(),
- mimetype='multipart/x-mixed-replace; boundary=frame')
- def imagestream():
- while True:
- # max out at 5 FPS
- time.sleep(0.2)
- # make a copy of the current detected objects
- detected_objects = DETECTED_OBJECTS.copy()
- # lock and make a copy of the current frame
- with frame_lock:
- frame = frame_arr.copy()
- # convert to RGB for drawing
- frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
- # draw the bounding boxes on the screen
- for obj in detected_objects:
- vis_util.draw_bounding_box_on_image_array(frame,
- obj['ymin'],
- obj['xmin'],
- obj['ymax'],
- obj['xmax'],
- color='red',
- thickness=2,
- display_str_list=["{}: {}%".format(obj['name'],int(obj['score']*100))],
- use_normalized_coordinates=False)
- for region in regions:
- color = (255,255,255)
- cv2.rectangle(frame, (region['x_offset'], region['y_offset']),
- (region['x_offset']+region['size'], region['y_offset']+region['size']),
- color, 2)
- # convert back to BGR
- frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
- # encode the image into a jpg
- ret, jpg = cv2.imencode('.jpg', frame)
- yield (b'--frame\r\n'
- b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
- app.run(host='0.0.0.0', debug=False)
- capture_process.join()
- for detection_prep_thread in detection_prep_threads:
- detection_prep_thread.join()
- frame_tracker.join()
- best_person_frame.join()
- object_parser.join()
- object_cleaner.join()
- mqtt_publisher.join()
- if __name__ == '__main__':
- main()
|