detect_objects.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. import os
  2. import cv2
  3. import imutils
  4. import time
  5. import datetime
  6. import ctypes
  7. import logging
  8. import multiprocessing as mp
  9. import threading
  10. import json
  11. from contextlib import closing
  12. import numpy as np
  13. import tensorflow as tf
  14. from object_detection.utils import label_map_util
  15. from object_detection.utils import visualization_utils as vis_util
  16. from flask import Flask, Response, make_response
  17. import paho.mqtt.client as mqtt
  18. RTSP_URL = os.getenv('RTSP_URL')
  19. # Path to frozen detection graph. This is the actual model that is used for the object detection.
  20. PATH_TO_CKPT = '/frozen_inference_graph.pb'
  21. # List of the strings that is used to add correct label for each box.
  22. PATH_TO_LABELS = '/label_map.pbtext'
  23. MQTT_HOST = os.getenv('MQTT_HOST')
  24. MQTT_MOTION_TOPIC = os.getenv('MQTT_MOTION_TOPIC')
  25. MQTT_OBJECT_TOPIC = os.getenv('MQTT_OBJECT_TOPIC')
  26. MQTT_OBJECT_CLASSES = os.getenv('MQTT_OBJECT_CLASSES')
  27. # TODO: make dynamic?
  28. NUM_CLASSES = 90
  29. # REGIONS = "350,0,300,50:400,350,250,50:400,750,250,50"
  30. # REGIONS = "400,350,250,50"
  31. REGIONS = os.getenv('REGIONS')
  32. DETECTED_OBJECTS = []
  33. # Loading label map
  34. label_map = label_map_util.load_labelmap(PATH_TO_LABELS)
  35. categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES,
  36. use_display_name=True)
  37. category_index = label_map_util.create_category_index(categories)
  38. def detect_objects(cropped_frame, sess, detection_graph, region_size, region_x_offset, region_y_offset):
  39. # Expand dimensions since the model expects images to have shape: [1, None, None, 3]
  40. image_np_expanded = np.expand_dims(cropped_frame, axis=0)
  41. image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')
  42. # Each box represents a part of the image where a particular object was detected.
  43. boxes = detection_graph.get_tensor_by_name('detection_boxes:0')
  44. # Each score represent how level of confidence for each of the objects.
  45. # Score is shown on the result image, together with the class label.
  46. scores = detection_graph.get_tensor_by_name('detection_scores:0')
  47. classes = detection_graph.get_tensor_by_name('detection_classes:0')
  48. num_detections = detection_graph.get_tensor_by_name('num_detections:0')
  49. # Actual detection.
  50. (boxes, scores, classes, num_detections) = sess.run(
  51. [boxes, scores, classes, num_detections],
  52. feed_dict={image_tensor: image_np_expanded})
  53. # build an array of detected objects
  54. objects = []
  55. for index, value in enumerate(classes[0]):
  56. score = scores[0, index]
  57. if score > 0.1:
  58. box = boxes[0, index].tolist()
  59. box[0] = (box[0] * region_size) + region_y_offset
  60. box[1] = (box[1] * region_size) + region_x_offset
  61. box[2] = (box[2] * region_size) + region_y_offset
  62. box[3] = (box[3] * region_size) + region_x_offset
  63. objects += [value, scores[0, index]] + box
  64. # only get the first 10 objects
  65. if len(objects) == 60:
  66. break
  67. return objects
  68. class ObjectParser(threading.Thread):
  69. def __init__(self, object_arrays):
  70. threading.Thread.__init__(self)
  71. self._object_arrays = object_arrays
  72. def run(self):
  73. global DETECTED_OBJECTS
  74. while True:
  75. detected_objects = []
  76. for object_array in self._object_arrays:
  77. object_index = 0
  78. while(object_index < 60 and object_array[object_index] > 0):
  79. object_class = object_array[object_index]
  80. detected_objects.append({
  81. 'name': str(category_index.get(object_class).get('name')),
  82. 'score': object_array[object_index+1],
  83. 'ymin': int(object_array[object_index+2]),
  84. 'xmin': int(object_array[object_index+3]),
  85. 'ymax': int(object_array[object_index+4]),
  86. 'xmax': int(object_array[object_index+5])
  87. })
  88. object_index += 6
  89. DETECTED_OBJECTS = detected_objects
  90. time.sleep(0.1)
  91. class MqttPublisher(threading.Thread):
  92. def __init__(self, host, motion_topic, object_topic, object_classes, motion_flags):
  93. threading.Thread.__init__(self)
  94. self.client = mqtt.Client()
  95. self.client.connect(host, 1883, 60)
  96. self.client.loop_start()
  97. self.motion_topic = motion_topic
  98. self.object_topic = object_topic
  99. self.object_classes = object_classes
  100. self.motion_flags = motion_flags
  101. def run(self):
  102. global DETECTED_OBJECTS
  103. last_sent_payload = ""
  104. last_motion = ""
  105. while True:
  106. # initialize the payload
  107. payload = {}
  108. for obj in self.object_classes:
  109. payload[obj] = []
  110. # loop over detected objects and populate
  111. # the payload
  112. detected_objects = DETECTED_OBJECTS.copy()
  113. for obj in detected_objects:
  114. if obj['name'] in self.object_classes:
  115. payload[obj['name']].append(obj)
  116. new_payload = json.dumps(payload, sort_keys=True)
  117. if new_payload != last_sent_payload:
  118. last_sent_payload = new_payload
  119. self.client.publish(self.object_topic, new_payload, retain=False)
  120. motion_status = 'OFF'
  121. if any(obj.value == 1 for obj in self.motion_flags):
  122. motion_status = 'ON'
  123. if motion_status != last_motion:
  124. last_motion = motion_status
  125. self.client.publish(self.motion_topic, motion_status, retain=False)
  126. time.sleep(0.1)
  127. def main():
  128. # Parse selected regions
  129. regions = []
  130. for region_string in REGIONS.split(':'):
  131. region_parts = region_string.split(',')
  132. regions.append({
  133. 'size': int(region_parts[0]),
  134. 'x_offset': int(region_parts[1]),
  135. 'y_offset': int(region_parts[2]),
  136. 'min_object_size': int(region_parts[3])
  137. })
  138. # capture a single frame and check the frame shape so the correct array
  139. # size can be allocated in memory
  140. video = cv2.VideoCapture(RTSP_URL)
  141. ret, frame = video.read()
  142. if ret:
  143. frame_shape = frame.shape
  144. else:
  145. print("Unable to capture video stream")
  146. exit(1)
  147. video.release()
  148. shared_memory_objects = []
  149. for region in regions:
  150. shared_memory_objects.append({
  151. # shared value for signaling to the capture process that we are ready for the next frame
  152. # (1 for ready 0 for not ready)
  153. 'ready_for_frame': mp.Value('i', 1),
  154. # shared value for motion detection signal (1 for motion 0 for no motion)
  155. 'motion_detected': mp.Value('i', 0),
  156. # create shared array for storing 10 detected objects
  157. # note: this must be a double even though the value you are storing
  158. # is a float. otherwise it stops updating the value in shared
  159. # memory. probably something to do with the size of the memory block
  160. 'output_array': mp.Array(ctypes.c_double, 6*10)
  161. })
  162. # compute the flattened array length from the array shape
  163. flat_array_length = frame_shape[0] * frame_shape[1] * frame_shape[2]
  164. # create shared array for storing the full frame image data
  165. shared_arr = mp.Array(ctypes.c_uint16, flat_array_length)
  166. # create shared value for storing the frame_time
  167. shared_frame_time = mp.Value('d', 0.0)
  168. # shape current frame so it can be treated as an image
  169. frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
  170. capture_process = mp.Process(target=fetch_frames, args=(shared_arr, shared_frame_time, [obj['ready_for_frame'] for obj in shared_memory_objects], frame_shape))
  171. capture_process.daemon = True
  172. detection_processes = []
  173. for index, region in enumerate(regions):
  174. detection_process = mp.Process(target=process_frames, args=(shared_arr,
  175. shared_memory_objects[index]['output_array'],
  176. shared_frame_time,
  177. shared_memory_objects[index]['motion_detected'],
  178. frame_shape,
  179. region['size'], region['x_offset'], region['y_offset']))
  180. detection_process.daemon = True
  181. detection_processes.append(detection_process)
  182. motion_processes = []
  183. for index, region in enumerate(regions):
  184. motion_process = mp.Process(target=detect_motion, args=(shared_arr,
  185. shared_frame_time,
  186. shared_memory_objects[index]['ready_for_frame'],
  187. shared_memory_objects[index]['motion_detected'],
  188. frame_shape,
  189. region['size'], region['x_offset'], region['y_offset'],
  190. region['min_object_size']))
  191. motion_process.daemon = True
  192. motion_processes.append(motion_process)
  193. object_parser = ObjectParser([obj['output_array'] for obj in shared_memory_objects])
  194. object_parser.start()
  195. mqtt_publisher = MqttPublisher(MQTT_HOST, MQTT_MOTION_TOPIC, MQTT_OBJECT_TOPIC,
  196. MQTT_OBJECT_CLASSES.split(','),
  197. [obj['motion_detected'] for obj in shared_memory_objects])
  198. mqtt_publisher.start()
  199. capture_process.start()
  200. print("capture_process pid ", capture_process.pid)
  201. for detection_process in detection_processes:
  202. detection_process.start()
  203. print("detection_process pid ", detection_process.pid)
  204. for motion_process in motion_processes:
  205. motion_process.start()
  206. print("motion_process pid ", motion_process.pid)
  207. app = Flask(__name__)
  208. @app.route('/')
  209. def index():
  210. # return a multipart response
  211. return Response(imagestream(),
  212. mimetype='multipart/x-mixed-replace; boundary=frame')
  213. def imagestream():
  214. global DETECTED_OBJECTS
  215. while True:
  216. # max out at 5 FPS
  217. time.sleep(0.2)
  218. # make a copy of the current detected objects
  219. detected_objects = DETECTED_OBJECTS.copy()
  220. # make a copy of the current frame
  221. frame = frame_arr.copy()
  222. # convert to RGB for drawing
  223. frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
  224. # draw the bounding boxes on the screen
  225. for obj in detected_objects:
  226. vis_util.draw_bounding_box_on_image_array(frame,
  227. obj['ymin'],
  228. obj['xmin'],
  229. obj['ymax'],
  230. obj['xmax'],
  231. color='red',
  232. thickness=2,
  233. display_str_list=["{}: {}%".format(obj['name'],int(obj['score']*100))],
  234. use_normalized_coordinates=False)
  235. for region in regions:
  236. cv2.rectangle(frame, (region['x_offset'], region['y_offset']),
  237. (region['x_offset']+region['size'], region['y_offset']+region['size']),
  238. (255,255,255), 2)
  239. motion_status = 'No Motion'
  240. if any(obj['motion_detected'].value == 1 for obj in shared_memory_objects):
  241. motion_status = 'Motion'
  242. cv2.putText(frame, motion_status, (10, 20),
  243. cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
  244. # convert back to BGR
  245. frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
  246. # encode the image into a jpg
  247. ret, jpg = cv2.imencode('.jpg', frame)
  248. yield (b'--frame\r\n'
  249. b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
  250. app.run(host='0.0.0.0', debug=False)
  251. capture_process.join()
  252. for detection_process in detection_processes:
  253. detection_process.join()
  254. for motion_process in motion_processes:
  255. motion_process.join()
  256. object_parser.join()
  257. mqtt_publisher.join()
  258. # convert shared memory array into numpy array
  259. def tonumpyarray(mp_arr):
  260. return np.frombuffer(mp_arr.get_obj(), dtype=np.uint16)
  261. # fetch the frames as fast a possible, only decoding the frames when the
  262. # detection_process has consumed the current frame
  263. def fetch_frames(shared_arr, shared_frame_time, ready_for_frame_flags, frame_shape):
  264. # convert shared memory array into numpy and shape into image array
  265. arr = tonumpyarray(shared_arr).reshape(frame_shape)
  266. # start the video capture
  267. video = cv2.VideoCapture(RTSP_URL)
  268. # keep the buffer small so we minimize old data
  269. video.set(cv2.CAP_PROP_BUFFERSIZE,1)
  270. while True:
  271. # grab the frame, but dont decode it yet
  272. ret = video.grab()
  273. # snapshot the time the frame was grabbed
  274. frame_time = datetime.datetime.now()
  275. if ret:
  276. # if the anyone is ready for the next frame decode it
  277. # otherwise skip this frame and move onto the next one
  278. if any(flag.value == 1 for flag in ready_for_frame_flags):
  279. # go ahead and decode the current frame
  280. ret, frame = video.retrieve()
  281. if ret:
  282. arr[:] = frame
  283. shared_frame_time.value = frame_time.timestamp()
  284. # signal to the detection_processes by setting the shared_frame_time
  285. for flag in ready_for_frame_flags:
  286. flag.value = 0
  287. else:
  288. # sleep a little to reduce CPU usage
  289. time.sleep(0.1)
  290. video.release()
  291. # do the actual object detection
  292. def process_frames(shared_arr, shared_output_arr, shared_frame_time, shared_motion, frame_shape, region_size, region_x_offset, region_y_offset):
  293. # shape shared input array into frame for processing
  294. arr = tonumpyarray(shared_arr).reshape(frame_shape)
  295. # Load a (frozen) Tensorflow model into memory before the processing loop
  296. detection_graph = tf.Graph()
  297. with detection_graph.as_default():
  298. od_graph_def = tf.GraphDef()
  299. with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:
  300. serialized_graph = fid.read()
  301. od_graph_def.ParseFromString(serialized_graph)
  302. tf.import_graph_def(od_graph_def, name='')
  303. sess = tf.Session(graph=detection_graph)
  304. no_frames_available = -1
  305. frame_time = 0.0
  306. while True:
  307. now = datetime.datetime.now().timestamp()
  308. # if there is no motion detected
  309. if shared_motion.value == 0:
  310. time.sleep(0.1)
  311. continue
  312. # if there isnt a new frame ready for processing
  313. if shared_frame_time.value == frame_time:
  314. # save the first time there were no frames available
  315. if no_frames_available == -1:
  316. no_frames_available = now
  317. # if there havent been any frames available in 30 seconds,
  318. # sleep to avoid using so much cpu if the camera feed is down
  319. if no_frames_available > 0 and (now - no_frames_available) > 30:
  320. time.sleep(1)
  321. print("sleeping because no frames have been available in a while")
  322. else:
  323. # rest a little bit to avoid maxing out the CPU
  324. time.sleep(0.1)
  325. continue
  326. # we got a valid frame, so reset the timer
  327. no_frames_available = -1
  328. # if the frame is more than 0.5 second old, ignore it
  329. if (now - shared_frame_time.value) > 0.5:
  330. # rest a little bit to avoid maxing out the CPU
  331. time.sleep(0.1)
  332. continue
  333. # make a copy of the cropped frame
  334. cropped_frame = arr[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy()
  335. frame_time = shared_frame_time.value
  336. # convert to RGB
  337. cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB)
  338. # do the object detection
  339. objects = detect_objects(cropped_frame_rgb, sess, detection_graph, region_size, region_x_offset, region_y_offset)
  340. # copy the detected objects to the output array, filling the array when needed
  341. shared_output_arr[:] = objects + [0.0] * (60-len(objects))
  342. # do the actual object detection
  343. def detect_motion(shared_arr, shared_frame_time, ready_for_frame, shared_motion, frame_shape, region_size, region_x_offset, region_y_offset, min_motion_area):
  344. # shape shared input array into frame for processing
  345. arr = tonumpyarray(shared_arr).reshape(frame_shape)
  346. no_frames_available = -1
  347. avg_frame = None
  348. last_motion = -1
  349. frame_time = 0.0
  350. while True:
  351. now = datetime.datetime.now().timestamp()
  352. # if it has been 30 seconds since the last motion, clear the flag
  353. if last_motion > 0 and (now - last_motion) > 30:
  354. last_motion = -1
  355. shared_motion.value = 0
  356. # if there isnt a frame ready for processing
  357. if shared_frame_time.value == frame_time:
  358. # save the first time there were no frames available
  359. if no_frames_available == -1:
  360. no_frames_available = now
  361. # if there havent been any frames available in 30 seconds,
  362. # sleep to avoid using so much cpu if the camera feed is down
  363. if no_frames_available > 0 and (now - no_frames_available) > 30:
  364. time.sleep(1)
  365. print("sleeping because no frames have been available in a while")
  366. else:
  367. # rest a little bit to avoid maxing out the CPU
  368. time.sleep(0.1)
  369. if ready_for_frame.value == 0:
  370. ready_for_frame.value = 1
  371. continue
  372. # we got a valid frame, so reset the timer
  373. no_frames_available = -1
  374. # if the frame is more than 0.5 second old, discard it
  375. if (now - shared_frame_time.value) > 0.5:
  376. # signal that we need a new frame
  377. ready_for_frame.value = 1
  378. # rest a little bit to avoid maxing out the CPU
  379. time.sleep(0.1)
  380. continue
  381. # make a copy of the cropped frame
  382. cropped_frame = arr[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy().astype('uint8')
  383. frame_time = shared_frame_time.value
  384. # signal that the frame has been used so a new one will be ready
  385. ready_for_frame.value = 1
  386. # convert to grayscale
  387. gray = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2GRAY)
  388. # apply gaussian blur
  389. gray = cv2.GaussianBlur(gray, (21, 21), 0)
  390. if avg_frame is None:
  391. avg_frame = gray.copy().astype("float")
  392. continue
  393. # look at the delta from the avg_frame
  394. cv2.accumulateWeighted(gray, avg_frame, 0.5)
  395. frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(avg_frame))
  396. thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1]
  397. # dilate the thresholded image to fill in holes, then find contours
  398. # on thresholded image
  399. thresh = cv2.dilate(thresh, None, iterations=2)
  400. cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL,
  401. cv2.CHAIN_APPROX_SIMPLE)
  402. cnts = imutils.grab_contours(cnts)
  403. # loop over the contours
  404. for c in cnts:
  405. # if the contour is big enough report motion
  406. if cv2.contourArea(c) > min_motion_area:
  407. last_motion = now
  408. shared_motion.value = 1
  409. break
  410. if __name__ == '__main__':
  411. mp.freeze_support()
  412. main()