detect_objects.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. import os
  2. import cv2
  3. import imutils
  4. import time
  5. import datetime
  6. import ctypes
  7. import logging
  8. import multiprocessing as mp
  9. import threading
  10. import json
  11. from contextlib import closing
  12. import numpy as np
  13. import tensorflow as tf
  14. from object_detection.utils import label_map_util
  15. from object_detection.utils import visualization_utils as vis_util
  16. from flask import Flask, Response, make_response
  17. import paho.mqtt.client as mqtt
  18. RTSP_URL = os.getenv('RTSP_URL')
  19. # Path to frozen detection graph. This is the actual model that is used for the object detection.
  20. PATH_TO_CKPT = '/frozen_inference_graph.pb'
  21. # List of the strings that is used to add correct label for each box.
  22. PATH_TO_LABELS = '/label_map.pbtext'
  23. MQTT_HOST = os.getenv('MQTT_HOST')
  24. MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX')
  25. MQTT_OBJECT_CLASSES = os.getenv('MQTT_OBJECT_CLASSES')
  26. # TODO: make dynamic?
  27. NUM_CLASSES = 90
  28. # REGIONS = "350,0,300,50:400,350,250,50:400,750,250,50"
  29. # REGIONS = "400,350,250,50"
  30. REGIONS = os.getenv('REGIONS')
  31. DETECTED_OBJECTS = []
  32. DEBUG = (os.getenv('DEBUG') == '1')
  33. # Loading label map
  34. label_map = label_map_util.load_labelmap(PATH_TO_LABELS)
  35. categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES,
  36. use_display_name=True)
  37. category_index = label_map_util.create_category_index(categories)
  38. def detect_objects(cropped_frame, sess, detection_graph, region_size, region_x_offset, region_y_offset, debug):
  39. # Expand dimensions since the model expects images to have shape: [1, None, None, 3]
  40. image_np_expanded = np.expand_dims(cropped_frame, axis=0)
  41. image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')
  42. # Each box represents a part of the image where a particular object was detected.
  43. boxes = detection_graph.get_tensor_by_name('detection_boxes:0')
  44. # Each score represent how level of confidence for each of the objects.
  45. # Score is shown on the result image, together with the class label.
  46. scores = detection_graph.get_tensor_by_name('detection_scores:0')
  47. classes = detection_graph.get_tensor_by_name('detection_classes:0')
  48. num_detections = detection_graph.get_tensor_by_name('num_detections:0')
  49. # Actual detection.
  50. (boxes, scores, classes, num_detections) = sess.run(
  51. [boxes, scores, classes, num_detections],
  52. feed_dict={image_tensor: image_np_expanded})
  53. if debug:
  54. if len([value for index,value in enumerate(classes[0]) if str(category_index.get(value).get('name')) == 'person' and scores[0,index] > 0.5]) > 0:
  55. vis_util.visualize_boxes_and_labels_on_image_array(
  56. cropped_frame,
  57. np.squeeze(boxes),
  58. np.squeeze(classes).astype(np.int32),
  59. np.squeeze(scores),
  60. category_index,
  61. use_normalized_coordinates=True,
  62. line_thickness=4)
  63. cv2.imwrite("/lab/debug/obj-{}-{}-{}.jpg".format(region_x_offset, region_y_offset, datetime.datetime.now().timestamp()), cropped_frame)
  64. # build an array of detected objects
  65. objects = []
  66. for index, value in enumerate(classes[0]):
  67. score = scores[0, index]
  68. if score > 0.5:
  69. box = boxes[0, index].tolist()
  70. objects.append({
  71. 'name': str(category_index.get(value).get('name')),
  72. 'score': float(score),
  73. 'ymin': int((box[0] * region_size) + region_y_offset),
  74. 'xmin': int((box[1] * region_size) + region_x_offset),
  75. 'ymax': int((box[2] * region_size) + region_y_offset),
  76. 'xmax': int((box[3] * region_size) + region_x_offset)
  77. })
  78. return objects
  79. class ObjectParser(threading.Thread):
  80. def __init__(self, object_queue, objects_parsed):
  81. threading.Thread.__init__(self)
  82. self._object_queue = object_queue
  83. self._objects_parsed = objects_parsed
  84. def run(self):
  85. global DETECTED_OBJECTS
  86. while True:
  87. obj = self._object_queue.get()
  88. DETECTED_OBJECTS.append(obj)
  89. # notify that objects were parsed
  90. with self._objects_parsed:
  91. self._objects_parsed.notify_all()
  92. class ObjectCleaner(threading.Thread):
  93. def __init__(self, objects_parsed):
  94. threading.Thread.__init__(self)
  95. self._objects_parsed = objects_parsed
  96. def run(self):
  97. global DETECTED_OBJECTS
  98. while True:
  99. # expire the objects that are more than 1 second old
  100. now = datetime.datetime.now().timestamp()
  101. # look for the first object found within the last second
  102. # (newest objects are appended to the end)
  103. detected_objects = DETECTED_OBJECTS.copy()
  104. num_to_delete = 0
  105. for obj in detected_objects:
  106. if now-obj['frame_time']<1:
  107. break
  108. num_to_delete += 1
  109. if num_to_delete > 0:
  110. del DETECTED_OBJECTS[:num_to_delete]
  111. # notify that parsed objects were changed
  112. with self._objects_parsed:
  113. self._objects_parsed.notify_all()
  114. # wait a bit before checking for more expired frames
  115. time.sleep(0.2)
  116. class MqttMotionPublisher(threading.Thread):
  117. def __init__(self, client, topic_prefix, motion_changed, motion_flags):
  118. threading.Thread.__init__(self)
  119. self.client = client
  120. self.topic_prefix = topic_prefix
  121. self.motion_changed = motion_changed
  122. self.motion_flags = motion_flags
  123. def run(self):
  124. last_sent_motion = ""
  125. while True:
  126. with self.motion_changed:
  127. self.motion_changed.wait()
  128. # send message for motion
  129. motion_status = 'OFF'
  130. if any(obj.is_set() for obj in self.motion_flags):
  131. motion_status = 'ON'
  132. if last_sent_motion != motion_status:
  133. last_sent_motion = motion_status
  134. self.client.publish(self.topic_prefix+'/motion', motion_status, retain=False)
  135. class MqttObjectPublisher(threading.Thread):
  136. def __init__(self, client, topic_prefix, objects_parsed, object_classes):
  137. threading.Thread.__init__(self)
  138. self.client = client
  139. self.topic_prefix = topic_prefix
  140. self.objects_parsed = objects_parsed
  141. self.object_classes = object_classes
  142. def run(self):
  143. global DETECTED_OBJECTS
  144. last_sent_payload = ""
  145. while True:
  146. # initialize the payload
  147. payload = {}
  148. # wait until objects have been parsed
  149. with self.objects_parsed:
  150. self.objects_parsed.wait()
  151. # add all the person scores in detected objects and
  152. # average over past 1 seconds (5fps)
  153. detected_objects = DETECTED_OBJECTS.copy()
  154. avg_person_score = sum([obj['score'] for obj in detected_objects if obj['name'] == 'person'])/5
  155. payload['person'] = int(avg_person_score*100)
  156. # send message for objects if different
  157. new_payload = json.dumps(payload, sort_keys=True)
  158. if new_payload != last_sent_payload:
  159. last_sent_payload = new_payload
  160. self.client.publish(self.topic_prefix+'/objects', new_payload, retain=False)
  161. def main():
  162. # Parse selected regions
  163. regions = []
  164. for region_string in REGIONS.split(':'):
  165. region_parts = region_string.split(',')
  166. region_mask_image = cv2.imread("/config/{}".format(region_parts[5]), cv2.IMREAD_GRAYSCALE)
  167. region_mask = np.where(region_mask_image==[0])
  168. regions.append({
  169. 'size': int(region_parts[0]),
  170. 'x_offset': int(region_parts[1]),
  171. 'y_offset': int(region_parts[2]),
  172. 'min_person_area': int(region_parts[3]),
  173. 'min_object_size': int(region_parts[4]),
  174. 'mask': region_mask,
  175. # Event for motion detection signaling
  176. 'motion_detected': mp.Event(),
  177. # create shared array for storing 10 detected objects
  178. # note: this must be a double even though the value you are storing
  179. # is a float. otherwise it stops updating the value in shared
  180. # memory. probably something to do with the size of the memory block
  181. 'output_array': mp.Array(ctypes.c_double, 6*10)
  182. })
  183. # capture a single frame and check the frame shape so the correct array
  184. # size can be allocated in memory
  185. video = cv2.VideoCapture(RTSP_URL)
  186. ret, frame = video.read()
  187. if ret:
  188. frame_shape = frame.shape
  189. else:
  190. print("Unable to capture video stream")
  191. exit(1)
  192. video.release()
  193. # compute the flattened array length from the array shape
  194. flat_array_length = frame_shape[0] * frame_shape[1] * frame_shape[2]
  195. # create shared array for storing the full frame image data
  196. shared_arr = mp.Array(ctypes.c_uint16, flat_array_length)
  197. # create shared value for storing the frame_time
  198. shared_frame_time = mp.Value('d', 0.0)
  199. # Lock to control access to the frame while writing
  200. frame_lock = mp.Lock()
  201. # Condition for notifying that a new frame is ready
  202. frame_ready = mp.Condition()
  203. # Condition for notifying that motion status changed globally
  204. motion_changed = mp.Condition()
  205. # Condition for notifying that objects were parsed
  206. objects_parsed = mp.Condition()
  207. # Queue for detected objects
  208. object_queue = mp.Queue()
  209. # shape current frame so it can be treated as an image
  210. frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
  211. capture_process = mp.Process(target=fetch_frames, args=(shared_arr,
  212. shared_frame_time, frame_lock, frame_ready, frame_shape))
  213. capture_process.daemon = True
  214. detection_processes = []
  215. motion_processes = []
  216. for region in regions:
  217. detection_process = mp.Process(target=process_frames, args=(shared_arr,
  218. object_queue,
  219. shared_frame_time,
  220. frame_lock, frame_ready,
  221. region['motion_detected'],
  222. frame_shape,
  223. region['size'], region['x_offset'], region['y_offset'],
  224. region['min_person_area'],
  225. DEBUG))
  226. detection_process.daemon = True
  227. detection_processes.append(detection_process)
  228. motion_process = mp.Process(target=detect_motion, args=(shared_arr,
  229. shared_frame_time,
  230. frame_lock, frame_ready,
  231. region['motion_detected'],
  232. motion_changed,
  233. frame_shape,
  234. region['size'], region['x_offset'], region['y_offset'],
  235. region['min_object_size'], region['mask'],
  236. DEBUG))
  237. motion_process.daemon = True
  238. motion_processes.append(motion_process)
  239. object_parser = ObjectParser(object_queue, objects_parsed)
  240. object_parser.start()
  241. object_cleaner = ObjectCleaner(objects_parsed)
  242. object_cleaner.start()
  243. client = mqtt.Client()
  244. client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
  245. client.connect(MQTT_HOST, 1883, 60)
  246. client.loop_start()
  247. client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
  248. mqtt_publisher = MqttObjectPublisher(client, MQTT_TOPIC_PREFIX, objects_parsed,
  249. MQTT_OBJECT_CLASSES.split(','))
  250. mqtt_publisher.start()
  251. mqtt_motion_publisher = MqttMotionPublisher(client, MQTT_TOPIC_PREFIX, motion_changed,
  252. [region['motion_detected'] for region in regions])
  253. mqtt_motion_publisher.start()
  254. capture_process.start()
  255. print("capture_process pid ", capture_process.pid)
  256. for detection_process in detection_processes:
  257. detection_process.start()
  258. print("detection_process pid ", detection_process.pid)
  259. for motion_process in motion_processes:
  260. motion_process.start()
  261. print("motion_process pid ", motion_process.pid)
  262. app = Flask(__name__)
  263. @app.route('/')
  264. def index():
  265. # return a multipart response
  266. return Response(imagestream(),
  267. mimetype='multipart/x-mixed-replace; boundary=frame')
  268. def imagestream():
  269. global DETECTED_OBJECTS
  270. while True:
  271. # max out at 5 FPS
  272. time.sleep(0.2)
  273. # make a copy of the current detected objects
  274. detected_objects = DETECTED_OBJECTS.copy()
  275. # lock and make a copy of the current frame
  276. with frame_lock:
  277. frame = frame_arr.copy()
  278. # convert to RGB for drawing
  279. frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
  280. # draw the bounding boxes on the screen
  281. for obj in detected_objects:
  282. vis_util.draw_bounding_box_on_image_array(frame,
  283. obj['ymin'],
  284. obj['xmin'],
  285. obj['ymax'],
  286. obj['xmax'],
  287. color='red',
  288. thickness=2,
  289. display_str_list=["{}: {}%".format(obj['name'],int(obj['score']*100))],
  290. use_normalized_coordinates=False)
  291. for region in regions:
  292. color = (255,255,255)
  293. if region['motion_detected'].is_set():
  294. color = (0,255,0)
  295. cv2.rectangle(frame, (region['x_offset'], region['y_offset']),
  296. (region['x_offset']+region['size'], region['y_offset']+region['size']),
  297. color, 2)
  298. # convert back to BGR
  299. frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
  300. # encode the image into a jpg
  301. ret, jpg = cv2.imencode('.jpg', frame)
  302. yield (b'--frame\r\n'
  303. b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
  304. app.run(host='0.0.0.0', debug=False)
  305. capture_process.join()
  306. for detection_process in detection_processes:
  307. detection_process.join()
  308. for motion_process in motion_processes:
  309. motion_process.join()
  310. object_parser.join()
  311. object_cleaner.join()
  312. mqtt_publisher.join()
  313. # convert shared memory array into numpy array
  314. def tonumpyarray(mp_arr):
  315. return np.frombuffer(mp_arr.get_obj(), dtype=np.uint16)
  316. # fetch the frames as fast a possible, only decoding the frames when the
  317. # detection_process has consumed the current frame
  318. def fetch_frames(shared_arr, shared_frame_time, frame_lock, frame_ready, frame_shape):
  319. # convert shared memory array into numpy and shape into image array
  320. arr = tonumpyarray(shared_arr).reshape(frame_shape)
  321. # start the video capture
  322. video = cv2.VideoCapture()
  323. video.open(RTSP_URL)
  324. # keep the buffer small so we minimize old data
  325. video.set(cv2.CAP_PROP_BUFFERSIZE,1)
  326. while True:
  327. # check if the video stream is still open, and reopen if needed
  328. if not video.isOpened():
  329. success = video.open(RTSP_URL)
  330. if not success:
  331. time.sleep(1)
  332. continue
  333. # grab the frame, but dont decode it yet
  334. ret = video.grab()
  335. # snapshot the time the frame was grabbed
  336. frame_time = datetime.datetime.now()
  337. if ret:
  338. # go ahead and decode the current frame
  339. ret, frame = video.retrieve()
  340. if ret:
  341. # Lock access and update frame
  342. with frame_lock:
  343. arr[:] = frame
  344. shared_frame_time.value = frame_time.timestamp()
  345. # Notify with the condition that a new frame is ready
  346. with frame_ready:
  347. frame_ready.notify_all()
  348. video.release()
  349. # do the actual object detection
  350. def process_frames(shared_arr, object_queue, shared_frame_time, frame_lock, frame_ready,
  351. motion_detected, frame_shape, region_size, region_x_offset, region_y_offset,
  352. min_person_area, debug):
  353. # shape shared input array into frame for processing
  354. arr = tonumpyarray(shared_arr).reshape(frame_shape)
  355. # Load a (frozen) Tensorflow model into memory before the processing loop
  356. detection_graph = tf.Graph()
  357. with detection_graph.as_default():
  358. od_graph_def = tf.GraphDef()
  359. with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:
  360. serialized_graph = fid.read()
  361. od_graph_def.ParseFromString(serialized_graph)
  362. tf.import_graph_def(od_graph_def, name='')
  363. sess = tf.Session(graph=detection_graph)
  364. frame_time = 0.0
  365. while True:
  366. now = datetime.datetime.now().timestamp()
  367. # wait until motion is detected
  368. motion_detected.wait()
  369. with frame_ready:
  370. # if there isnt a frame ready for processing or it is old, wait for a signal
  371. if shared_frame_time.value == frame_time or (now - shared_frame_time.value) > 0.5:
  372. frame_ready.wait()
  373. # make a copy of the cropped frame
  374. with frame_lock:
  375. cropped_frame = arr[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy()
  376. frame_time = shared_frame_time.value
  377. # convert to RGB
  378. cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB)
  379. # do the object detection
  380. objects = detect_objects(cropped_frame_rgb, sess, detection_graph, region_size, region_x_offset, region_y_offset, debug)
  381. for obj in objects:
  382. # ignore persons below the size threshold
  383. if obj['name'] == 'person' and (obj['xmax']-obj['xmin'])*(obj['ymax']-obj['ymin']) < min_person_area:
  384. continue
  385. obj['frame_time'] = frame_time
  386. object_queue.put(obj)
  387. # do the actual motion detection
  388. def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion_detected, motion_changed,
  389. frame_shape, region_size, region_x_offset, region_y_offset, min_motion_area, mask, debug):
  390. # shape shared input array into frame for processing
  391. arr = tonumpyarray(shared_arr).reshape(frame_shape)
  392. avg_frame = None
  393. avg_delta = None
  394. frame_time = 0.0
  395. motion_frames = 0
  396. while True:
  397. now = datetime.datetime.now().timestamp()
  398. with frame_ready:
  399. # if there isnt a frame ready for processing or it is old, wait for a signal
  400. if shared_frame_time.value == frame_time or (now - shared_frame_time.value) > 0.5:
  401. frame_ready.wait()
  402. # lock and make a copy of the cropped frame
  403. with frame_lock:
  404. cropped_frame = arr[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy().astype('uint8')
  405. frame_time = shared_frame_time.value
  406. # convert to grayscale
  407. gray = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2GRAY)
  408. # apply image mask to remove areas from motion detection
  409. gray[mask] = [255]
  410. # apply gaussian blur
  411. gray = cv2.GaussianBlur(gray, (21, 21), 0)
  412. if avg_frame is None:
  413. avg_frame = gray.copy().astype("float")
  414. continue
  415. # look at the delta from the avg_frame
  416. frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(avg_frame))
  417. if avg_delta is None:
  418. avg_delta = frameDelta.copy().astype("float")
  419. # compute the average delta over the past few frames
  420. # the alpha value can be modified to configure how sensitive the motion detection is.
  421. # higher values mean the current frame impacts the delta a lot, and a single raindrop may
  422. # register as motion, too low and a fast moving person wont be detected as motion
  423. # this also assumes that a person is in the same location across more than a single frame
  424. cv2.accumulateWeighted(frameDelta, avg_delta, 0.2)
  425. # compute the threshold image for the current frame
  426. current_thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1]
  427. # black out everything in the avg_delta where there isnt motion in the current frame
  428. avg_delta_image = cv2.convertScaleAbs(avg_delta)
  429. avg_delta_image[np.where(current_thresh==[0])] = [0]
  430. # then look for deltas above the threshold, but only in areas where there is a delta
  431. # in the current frame. this prevents deltas from previous frames from being included
  432. thresh = cv2.threshold(avg_delta_image, 25, 255, cv2.THRESH_BINARY)[1]
  433. # dilate the thresholded image to fill in holes, then find contours
  434. # on thresholded image
  435. thresh = cv2.dilate(thresh, None, iterations=2)
  436. cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
  437. cnts = imutils.grab_contours(cnts)
  438. # if there are no contours, there is no motion
  439. if len(cnts) < 1:
  440. motion_frames = 0
  441. continue
  442. motion_found = False
  443. # loop over the contours
  444. for c in cnts:
  445. # if the contour is big enough, count it as motion
  446. contour_area = cv2.contourArea(c)
  447. if contour_area > min_motion_area:
  448. motion_found = True
  449. if debug:
  450. cv2.drawContours(cropped_frame, [c], -1, (0, 255, 0), 2)
  451. x, y, w, h = cv2.boundingRect(c)
  452. cv2.putText(cropped_frame, str(contour_area), (x, y),
  453. cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 0), 2)
  454. else:
  455. break
  456. if motion_found:
  457. motion_frames += 1
  458. # if there have been enough consecutive motion frames, report motion
  459. if motion_frames >= 3:
  460. # only average in the current frame if the difference persists for at least 3 frames
  461. cv2.accumulateWeighted(gray, avg_frame, 0.01)
  462. motion_detected.set()
  463. with motion_changed:
  464. motion_changed.notify_all()
  465. else:
  466. # when no motion, just keep averaging the frames together
  467. cv2.accumulateWeighted(gray, avg_frame, 0.01)
  468. motion_frames = 0
  469. if motion_detected.is_set():
  470. motion_detected.clear()
  471. with motion_changed:
  472. motion_changed.notify_all()
  473. if debug and motion_frames == 3:
  474. cv2.imwrite("/lab/debug/motion-{}-{}-{}.jpg".format(region_x_offset, region_y_offset, datetime.datetime.now().timestamp()), cropped_frame)
  475. cv2.imwrite("/lab/debug/avg_delta-{}-{}-{}.jpg".format(region_x_offset, region_y_offset, datetime.datetime.now().timestamp()), avg_delta_image)
  476. if __name__ == '__main__':
  477. mp.freeze_support()
  478. main()