detect_objects.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. import os
  2. import cv2
  3. import imutils
  4. import time
  5. import datetime
  6. import ctypes
  7. import logging
  8. import multiprocessing as mp
  9. import queue
  10. import threading
  11. import json
  12. from contextlib import closing
  13. import numpy as np
  14. from object_detection.utils import visualization_utils as vis_util
  15. from flask import Flask, Response, make_response, send_file
  16. import paho.mqtt.client as mqtt
  17. from frigate.util import tonumpyarray
  18. from frigate.mqtt import MqttMotionPublisher, MqttObjectPublisher
  19. from frigate.objects import ObjectParser, ObjectCleaner, BestPersonFrame
  20. from frigate.motion import detect_motion
  21. from frigate.video import fetch_frames, FrameTracker
  22. from frigate.object_detection import FramePrepper, PreppedQueueProcessor, detect_objects
  23. RTSP_URL = os.getenv('RTSP_URL')
  24. MQTT_HOST = os.getenv('MQTT_HOST')
  25. MQTT_USER = os.getenv('MQTT_USER')
  26. MQTT_PASS = os.getenv('MQTT_PASS')
  27. MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX')
  28. REGIONS = "300,0,0,2000,200,no-mask-300.bmp:300,300,0,2000,200,no-mask-300.bmp:300,600,0,2000,200,no-mask-300.bmp:300,900,0,2000,200,no-mask-300.bmp"
  29. # REGIONS = "400,350,250,50"
  30. # REGIONS = os.getenv('REGIONS')
  31. DEBUG = (os.getenv('DEBUG') == '1')
  32. def main():
  33. DETECTED_OBJECTS = []
  34. recent_motion_frames = {}
  35. # Parse selected regions
  36. regions = []
  37. for region_string in REGIONS.split(':'):
  38. region_parts = region_string.split(',')
  39. region_mask_image = cv2.imread("/config/{}".format(region_parts[5]), cv2.IMREAD_GRAYSCALE)
  40. region_mask = np.where(region_mask_image==[0])
  41. regions.append({
  42. 'size': int(region_parts[0]),
  43. 'x_offset': int(region_parts[1]),
  44. 'y_offset': int(region_parts[2]),
  45. 'min_person_area': int(region_parts[3]),
  46. 'min_object_size': int(region_parts[4]),
  47. 'mask': region_mask,
  48. # Event for motion detection signaling
  49. 'motion_detected': mp.Event(),
  50. # array for prepped frame with shape (1, 300, 300, 3)
  51. 'prepped_frame_array': mp.Array(ctypes.c_uint8, 300*300*3),
  52. # shared value for storing the prepped_frame_time
  53. 'prepped_frame_time': mp.Value('d', 0.0),
  54. # Lock to control access to the prepped frame
  55. 'prepped_frame_lock': mp.Lock()
  56. })
  57. # capture a single frame and check the frame shape so the correct array
  58. # size can be allocated in memory
  59. video = cv2.VideoCapture(RTSP_URL)
  60. ret, frame = video.read()
  61. if ret:
  62. frame_shape = frame.shape
  63. else:
  64. print("Unable to capture video stream")
  65. exit(1)
  66. video.release()
  67. # compute the flattened array length from the array shape
  68. flat_array_length = frame_shape[0] * frame_shape[1] * frame_shape[2]
  69. # create shared array for storing the full frame image data
  70. shared_arr = mp.Array(ctypes.c_uint8, flat_array_length)
  71. # create shared value for storing the frame_time
  72. shared_frame_time = mp.Value('d', 0.0)
  73. # Lock to control access to the frame
  74. frame_lock = mp.Lock()
  75. # Condition for notifying that a new frame is ready
  76. frame_ready = mp.Condition()
  77. # Condition for notifying that motion status changed globally
  78. motion_changed = mp.Condition()
  79. prepped_frame_array = mp.Array(ctypes.c_uint8, 300*300*3)
  80. # create shared value for storing the frame_time
  81. prepped_frame_time = mp.Value('d', 0.0)
  82. # Event for notifying that object detection needs a new frame
  83. prepped_frame_grabbed = mp.Event()
  84. prepped_frame_ready = mp.Event()
  85. # Condition for notifying that objects were parsed
  86. objects_parsed = mp.Condition()
  87. # Queue for detected objects
  88. object_queue = mp.Queue()
  89. # Queue for prepped frames
  90. prepped_frame_queue = queue.Queue(len(regions)*2)
  91. prepped_frame_box = mp.Array(ctypes.c_uint16, 3)
  92. # shape current frame so it can be treated as an image
  93. frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
  94. # start the process to capture frames from the RTSP stream and store in a shared array
  95. capture_process = mp.Process(target=fetch_frames, args=(shared_arr,
  96. shared_frame_time, frame_lock, frame_ready, frame_shape, RTSP_URL))
  97. capture_process.daemon = True
  98. # for each region, start a separate process for motion detection and object detection
  99. detection_prep_threads = []
  100. motion_processes = []
  101. for region in regions:
  102. detection_prep_threads.append(FramePrepper(
  103. frame_arr,
  104. shared_frame_time,
  105. frame_ready,
  106. frame_lock,
  107. region['motion_detected'],
  108. region['size'], region['x_offset'], region['y_offset'],
  109. prepped_frame_queue
  110. ))
  111. motion_process = mp.Process(target=detect_motion, args=(shared_arr,
  112. shared_frame_time,
  113. frame_lock, frame_ready,
  114. region['motion_detected'],
  115. motion_changed,
  116. frame_shape,
  117. region['size'], region['x_offset'], region['y_offset'],
  118. region['min_object_size'], region['mask'],
  119. DEBUG))
  120. motion_process.daemon = True
  121. motion_processes.append(motion_process)
  122. prepped_queue_processor = PreppedQueueProcessor(
  123. prepped_frame_array,
  124. prepped_frame_time,
  125. prepped_frame_ready,
  126. prepped_frame_grabbed,
  127. prepped_frame_box,
  128. prepped_frame_queue
  129. )
  130. prepped_queue_processor.start()
  131. # create a process for object detection
  132. # if the coprocessor is doing the work, can this run as a thread
  133. # since it is waiting for IO?
  134. detection_process = mp.Process(target=detect_objects, args=(
  135. prepped_frame_array,
  136. prepped_frame_time,
  137. prepped_frame_ready,
  138. prepped_frame_grabbed,
  139. prepped_frame_box,
  140. object_queue, DEBUG
  141. ))
  142. detection_process.daemon = True
  143. # start a thread to store recent motion frames for processing
  144. frame_tracker = FrameTracker(frame_arr, shared_frame_time, frame_ready, frame_lock,
  145. recent_motion_frames, motion_changed, [region['motion_detected'] for region in regions])
  146. frame_tracker.start()
  147. # start a thread to store the highest scoring recent person frame
  148. best_person_frame = BestPersonFrame(objects_parsed, recent_motion_frames, DETECTED_OBJECTS,
  149. motion_changed, [region['motion_detected'] for region in regions])
  150. best_person_frame.start()
  151. # start a thread to parse objects from the queue
  152. object_parser = ObjectParser(object_queue, objects_parsed, DETECTED_OBJECTS)
  153. object_parser.start()
  154. # start a thread to expire objects from the detected objects list
  155. object_cleaner = ObjectCleaner(objects_parsed, DETECTED_OBJECTS,
  156. motion_changed, [region['motion_detected'] for region in regions])
  157. object_cleaner.start()
  158. # connect to mqtt and setup last will
  159. def on_connect(client, userdata, flags, rc):
  160. print("On connect called")
  161. # publish a message to signal that the service is running
  162. client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
  163. client = mqtt.Client()
  164. client.on_connect = on_connect
  165. client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
  166. if not MQTT_USER is None:
  167. client.username_pw_set(MQTT_USER, password=MQTT_PASS)
  168. client.connect(MQTT_HOST, 1883, 60)
  169. client.loop_start()
  170. # start a thread to publish object scores (currently only person)
  171. mqtt_publisher = MqttObjectPublisher(client, MQTT_TOPIC_PREFIX, objects_parsed, DETECTED_OBJECTS)
  172. mqtt_publisher.start()
  173. # start thread to publish motion status
  174. mqtt_motion_publisher = MqttMotionPublisher(client, MQTT_TOPIC_PREFIX, motion_changed,
  175. [region['motion_detected'] for region in regions])
  176. mqtt_motion_publisher.start()
  177. # start the process of capturing frames
  178. capture_process.start()
  179. print("capture_process pid ", capture_process.pid)
  180. # start the object detection prep processes
  181. for detection_prep_thread in detection_prep_threads:
  182. detection_prep_thread.start()
  183. detection_process.start()
  184. print("detection_process pid ", detection_process.pid)
  185. # start the motion detection processes
  186. # for motion_process in motion_processes:
  187. # motion_process.start()
  188. # print("motion_process pid ", motion_process.pid)
  189. # TEMP: short circuit the motion detection
  190. for region in regions:
  191. region['motion_detected'].set()
  192. with motion_changed:
  193. motion_changed.notify_all()
  194. # create a flask app that encodes frames a mjpeg on demand
  195. app = Flask(__name__)
  196. @app.route('/best_person.jpg')
  197. def best_person():
  198. frame = np.zeros(frame_shape, np.uint8) if best_person_frame.best_frame is None else best_person_frame.best_frame
  199. ret, jpg = cv2.imencode('.jpg', frame)
  200. response = make_response(jpg.tobytes())
  201. response.headers['Content-Type'] = 'image/jpg'
  202. return response
  203. @app.route('/')
  204. def index():
  205. # return a multipart response
  206. return Response(imagestream(),
  207. mimetype='multipart/x-mixed-replace; boundary=frame')
  208. def imagestream():
  209. while True:
  210. # max out at 5 FPS
  211. time.sleep(0.2)
  212. # make a copy of the current detected objects
  213. detected_objects = DETECTED_OBJECTS.copy()
  214. # lock and make a copy of the current frame
  215. with frame_lock:
  216. frame = frame_arr.copy()
  217. # convert to RGB for drawing
  218. frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
  219. # draw the bounding boxes on the screen
  220. for obj in detected_objects:
  221. vis_util.draw_bounding_box_on_image_array(frame,
  222. obj['ymin'],
  223. obj['xmin'],
  224. obj['ymax'],
  225. obj['xmax'],
  226. color='red',
  227. thickness=2,
  228. display_str_list=["{}: {}%".format(obj['name'],int(obj['score']*100))],
  229. use_normalized_coordinates=False)
  230. for region in regions:
  231. color = (255,255,255)
  232. if region['motion_detected'].is_set():
  233. color = (0,255,0)
  234. cv2.rectangle(frame, (region['x_offset'], region['y_offset']),
  235. (region['x_offset']+region['size'], region['y_offset']+region['size']),
  236. color, 2)
  237. # convert back to BGR
  238. frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
  239. # encode the image into a jpg
  240. ret, jpg = cv2.imencode('.jpg', frame)
  241. yield (b'--frame\r\n'
  242. b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
  243. app.run(host='0.0.0.0', debug=False)
  244. capture_process.join()
  245. for detection_prep_thread in detection_prep_threads:
  246. detection_prep_thread.join()
  247. for motion_process in motion_processes:
  248. motion_process.join()
  249. detection_process.join()
  250. frame_tracker.join()
  251. best_person_frame.join()
  252. object_parser.join()
  253. object_cleaner.join()
  254. mqtt_publisher.join()
  255. if __name__ == '__main__':
  256. main()