detect_objects.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. import os
  2. import cv2
  3. import imutils
  4. import time
  5. import datetime
  6. import ctypes
  7. import logging
  8. import multiprocessing as mp
  9. import threading
  10. import json
  11. from contextlib import closing
  12. import numpy as np
  13. from object_detection.utils import visualization_utils as vis_util
  14. from flask import Flask, Response, make_response, send_file
  15. import paho.mqtt.client as mqtt
  16. from frigate.util import tonumpyarray
  17. from frigate.mqtt import MqttMotionPublisher, MqttObjectPublisher
  18. from frigate.objects import ObjectParser, ObjectCleaner, BestPersonFrame
  19. from frigate.motion import detect_motion
  20. from frigate.video import fetch_frames, FrameTracker
  21. from frigate.object_detection import prep_for_detection, detect_objects
  22. RTSP_URL = os.getenv('RTSP_URL')
  23. MQTT_HOST = os.getenv('MQTT_HOST')
  24. MQTT_USER = os.getenv('MQTT_USER')
  25. MQTT_PASS = os.getenv('MQTT_PASS')
  26. MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX')
  27. # REGIONS = "350,0,300,50:400,350,250,50:400,750,250,50"
  28. # REGIONS = "400,350,250,50"
  29. REGIONS = os.getenv('REGIONS')
  30. DEBUG = (os.getenv('DEBUG') == '1')
  31. def main():
  32. DETECTED_OBJECTS = []
  33. recent_motion_frames = {}
  34. # Parse selected regions
  35. regions = []
  36. for region_string in REGIONS.split(':'):
  37. region_parts = region_string.split(',')
  38. region_mask_image = cv2.imread("/config/{}".format(region_parts[5]), cv2.IMREAD_GRAYSCALE)
  39. region_mask = np.where(region_mask_image==[0])
  40. regions.append({
  41. 'size': int(region_parts[0]),
  42. 'x_offset': int(region_parts[1]),
  43. 'y_offset': int(region_parts[2]),
  44. 'min_person_area': int(region_parts[3]),
  45. 'min_object_size': int(region_parts[4]),
  46. 'mask': region_mask,
  47. # Event for motion detection signaling
  48. 'motion_detected': mp.Event(),
  49. # create shared array for storing 10 detected objects
  50. # note: this must be a double even though the value you are storing
  51. # is a float. otherwise it stops updating the value in shared
  52. # memory. probably something to do with the size of the memory block
  53. 'output_array': mp.Array(ctypes.c_double, 6*10)
  54. })
  55. # capture a single frame and check the frame shape so the correct array
  56. # size can be allocated in memory
  57. video = cv2.VideoCapture(RTSP_URL)
  58. ret, frame = video.read()
  59. if ret:
  60. frame_shape = frame.shape
  61. else:
  62. print("Unable to capture video stream")
  63. exit(1)
  64. video.release()
  65. # compute the flattened array length from the array shape
  66. flat_array_length = frame_shape[0] * frame_shape[1] * frame_shape[2]
  67. # create shared array for storing the full frame image data
  68. shared_arr = mp.Array(ctypes.c_uint8, flat_array_length)
  69. # create shared value for storing the frame_time
  70. shared_frame_time = mp.Value('d', 0.0)
  71. # Lock to control access to the frame
  72. frame_lock = mp.Lock()
  73. # Condition for notifying that a new frame is ready
  74. frame_ready = mp.Condition()
  75. # Condition for notifying that motion status changed globally
  76. motion_changed = mp.Condition()
  77. # Condition for notifying that objects were parsed
  78. objects_parsed = mp.Condition()
  79. # Queue for detected objects
  80. object_queue = mp.Queue()
  81. # array for prepped frame with shape (1, 300, 300, 3)
  82. prepped_frame_array = mp.Array(ctypes.c_uint8, 300*300*3)
  83. # shared value for storing the prepped_frame_time
  84. prepped_frame_time = mp.Value('d', 0.0)
  85. # Condition for notifying that a new prepped frame is ready
  86. prepped_frame_ready = mp.Condition()
  87. # Lock to control access to the prepped frame
  88. prepped_frame_lock = mp.Lock()
  89. # array for prepped frame box [x1, y1, x2, y2]
  90. prepped_frame_box = mp.Array(ctypes.c_uint16, 4)
  91. # shape current frame so it can be treated as an image
  92. frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
  93. # start the process to capture frames from the RTSP stream and store in a shared array
  94. capture_process = mp.Process(target=fetch_frames, args=(shared_arr,
  95. shared_frame_time, frame_lock, frame_ready, frame_shape, RTSP_URL))
  96. capture_process.daemon = True
  97. # for each region, start a separate process for motion detection and object detection
  98. detection_prep_processes = []
  99. motion_processes = []
  100. for region in regions:
  101. # possibly try putting these on threads and putting prepped
  102. # frames in a queue
  103. detection_prep_process = mp.Process(target=prep_for_detection, args=(shared_arr,
  104. shared_frame_time,
  105. frame_lock, frame_ready,
  106. region['motion_detected'],
  107. frame_shape,
  108. region['size'], region['x_offset'], region['y_offset'],
  109. prepped_frame_array, prepped_frame_time, prepped_frame_ready,
  110. prepped_frame_lock, prepped_frame_box))
  111. detection_prep_process.daemon = True
  112. detection_prep_processes.append(detection_prep_process)
  113. motion_process = mp.Process(target=detect_motion, args=(shared_arr,
  114. shared_frame_time,
  115. frame_lock, frame_ready,
  116. region['motion_detected'],
  117. motion_changed,
  118. frame_shape,
  119. region['size'], region['x_offset'], region['y_offset'],
  120. region['min_object_size'], region['mask'],
  121. DEBUG))
  122. motion_process.daemon = True
  123. motion_processes.append(motion_process)
  124. # create a process for object detection
  125. detection_process = mp.Process(target=detect_objects, args=(
  126. prepped_frame_array, prepped_frame_time,
  127. prepped_frame_lock, prepped_frame_ready,
  128. prepped_frame_box, object_queue, DEBUG
  129. ))
  130. detection_process.daemon = True
  131. # start a thread to store recent motion frames for processing
  132. frame_tracker = FrameTracker(frame_arr, shared_frame_time, frame_ready, frame_lock,
  133. recent_motion_frames, motion_changed, [region['motion_detected'] for region in regions])
  134. frame_tracker.start()
  135. # start a thread to store the highest scoring recent person frame
  136. best_person_frame = BestPersonFrame(objects_parsed, recent_motion_frames, DETECTED_OBJECTS,
  137. motion_changed, [region['motion_detected'] for region in regions])
  138. best_person_frame.start()
  139. # start a thread to parse objects from the queue
  140. object_parser = ObjectParser(object_queue, objects_parsed, DETECTED_OBJECTS)
  141. object_parser.start()
  142. # start a thread to expire objects from the detected objects list
  143. object_cleaner = ObjectCleaner(objects_parsed, DETECTED_OBJECTS,
  144. motion_changed, [region['motion_detected'] for region in regions])
  145. object_cleaner.start()
  146. # connect to mqtt and setup last will
  147. def on_connect(client, userdata, flags, rc):
  148. print("On connect called")
  149. # publish a message to signal that the service is running
  150. client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
  151. client = mqtt.Client()
  152. client.on_connect = on_connect
  153. client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
  154. if not MQTT_USER is None:
  155. client.username_pw_set(MQTT_USER, password=MQTT_PASS)
  156. client.connect(MQTT_HOST, 1883, 60)
  157. client.loop_start()
  158. # start a thread to publish object scores (currently only person)
  159. mqtt_publisher = MqttObjectPublisher(client, MQTT_TOPIC_PREFIX, objects_parsed, DETECTED_OBJECTS)
  160. mqtt_publisher.start()
  161. # start thread to publish motion status
  162. mqtt_motion_publisher = MqttMotionPublisher(client, MQTT_TOPIC_PREFIX, motion_changed,
  163. [region['motion_detected'] for region in regions])
  164. mqtt_motion_publisher.start()
  165. # start the process of capturing frames
  166. capture_process.start()
  167. print("capture_process pid ", capture_process.pid)
  168. # start the object detection prep processes
  169. for detection_prep_process in detection_prep_processes:
  170. detection_prep_process.start()
  171. print("detection_prep_process pid ", detection_prep_process.pid)
  172. detection_process.start()
  173. print("detection_process pid ", detection_process.pid)
  174. # start the motion detection processes
  175. # for motion_process in motion_processes:
  176. # motion_process.start()
  177. # print("motion_process pid ", motion_process.pid)
  178. # TEMP: short circuit the motion detection
  179. for region in regions:
  180. region['motion_detected'].set()
  181. with motion_changed:
  182. motion_changed.notify_all()
  183. # create a flask app that encodes frames a mjpeg on demand
  184. app = Flask(__name__)
  185. @app.route('/best_person.jpg')
  186. def best_person():
  187. frame = np.zeros(frame_shape, np.uint8) if best_person_frame.best_frame is None else best_person_frame.best_frame
  188. ret, jpg = cv2.imencode('.jpg', frame)
  189. response = make_response(jpg.tobytes())
  190. response.headers['Content-Type'] = 'image/jpg'
  191. return response
  192. @app.route('/')
  193. def index():
  194. # return a multipart response
  195. return Response(imagestream(),
  196. mimetype='multipart/x-mixed-replace; boundary=frame')
  197. def imagestream():
  198. while True:
  199. # max out at 5 FPS
  200. time.sleep(0.2)
  201. # make a copy of the current detected objects
  202. detected_objects = DETECTED_OBJECTS.copy()
  203. # lock and make a copy of the current frame
  204. with frame_lock:
  205. frame = frame_arr.copy()
  206. # convert to RGB for drawing
  207. frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
  208. # draw the bounding boxes on the screen
  209. for obj in detected_objects:
  210. vis_util.draw_bounding_box_on_image_array(frame,
  211. obj['ymin'],
  212. obj['xmin'],
  213. obj['ymax'],
  214. obj['xmax'],
  215. color='red',
  216. thickness=2,
  217. display_str_list=["{}: {}%".format(obj['name'],int(obj['score']*100))],
  218. use_normalized_coordinates=False)
  219. for region in regions:
  220. color = (255,255,255)
  221. if region['motion_detected'].is_set():
  222. color = (0,255,0)
  223. cv2.rectangle(frame, (region['x_offset'], region['y_offset']),
  224. (region['x_offset']+region['size'], region['y_offset']+region['size']),
  225. color, 2)
  226. # convert back to BGR
  227. frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
  228. # encode the image into a jpg
  229. ret, jpg = cv2.imencode('.jpg', frame)
  230. yield (b'--frame\r\n'
  231. b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
  232. app.run(host='0.0.0.0', debug=False)
  233. capture_process.join()
  234. for detection_prep_process in detection_prep_processes:
  235. detection_prep_process.join()
  236. for motion_process in motion_processes:
  237. motion_process.join()
  238. detection_process.join()
  239. frame_tracker.join()
  240. best_person_frame.join()
  241. object_parser.join()
  242. object_cleaner.join()
  243. mqtt_publisher.join()
  244. if __name__ == '__main__':
  245. main()