123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- import os
- import cv2
- import imutils
- import time
- import datetime
- import ctypes
- import logging
- import multiprocessing as mp
- import threading
- import json
- from contextlib import closing
- import numpy as np
- import tensorflow as tf
- from object_detection.utils import label_map_util
- from object_detection.utils import visualization_utils as vis_util
- from flask import Flask, Response, make_response
- import paho.mqtt.client as mqtt
- RTSP_URL = os.getenv('RTSP_URL')
- # Path to frozen detection graph. This is the actual model that is used for the object detection.
- PATH_TO_CKPT = '/frozen_inference_graph.pb'
- # List of the strings that is used to add correct label for each box.
- PATH_TO_LABELS = '/label_map.pbtext'
- MQTT_HOST = os.getenv('MQTT_HOST')
- MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX')
- MQTT_OBJECT_CLASSES = os.getenv('MQTT_OBJECT_CLASSES')
- # TODO: make dynamic?
- NUM_CLASSES = 90
- # REGIONS = "350,0,300,50:400,350,250,50:400,750,250,50"
- # REGIONS = "400,350,250,50"
- REGIONS = os.getenv('REGIONS')
- DETECTED_OBJECTS = []
- # Loading label map
- label_map = label_map_util.load_labelmap(PATH_TO_LABELS)
- categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES,
- use_display_name=True)
- category_index = label_map_util.create_category_index(categories)
- def detect_objects(cropped_frame, sess, detection_graph, region_size, region_x_offset, region_y_offset, debug):
- # Expand dimensions since the model expects images to have shape: [1, None, None, 3]
- image_np_expanded = np.expand_dims(cropped_frame, axis=0)
- image_tensor = detection_graph.get_tensor_by_name('image_tensor:0')
- # Each box represents a part of the image where a particular object was detected.
- boxes = detection_graph.get_tensor_by_name('detection_boxes:0')
- # Each score represent how level of confidence for each of the objects.
- # Score is shown on the result image, together with the class label.
- scores = detection_graph.get_tensor_by_name('detection_scores:0')
- classes = detection_graph.get_tensor_by_name('detection_classes:0')
- num_detections = detection_graph.get_tensor_by_name('num_detections:0')
- # Actual detection.
- (boxes, scores, classes, num_detections) = sess.run(
- [boxes, scores, classes, num_detections],
- feed_dict={image_tensor: image_np_expanded})
- if debug:
- if len([category_index.get(value) for index,value in enumerate(classes[0]) if scores[0,index] > 0.5]) > 0:
- vis_util.visualize_boxes_and_labels_on_image_array(
- cropped_frame,
- np.squeeze(boxes),
- np.squeeze(classes).astype(np.int32),
- np.squeeze(scores),
- category_index,
- use_normalized_coordinates=True,
- line_thickness=4)
- cv2.imwrite("/lab/debug/obj-{}-{}-{}.jpg".format(region_x_offset, region_y_offset, datetime.datetime.now().timestamp()), cropped_frame)
- # build an array of detected objects
- objects = []
- for index, value in enumerate(classes[0]):
- score = scores[0, index]
- if score > 0.5:
- box = boxes[0, index].tolist()
- box[0] = (box[0] * region_size) + region_y_offset
- box[1] = (box[1] * region_size) + region_x_offset
- box[2] = (box[2] * region_size) + region_y_offset
- box[3] = (box[3] * region_size) + region_x_offset
- objects += [value, scores[0, index]] + box
- # only get the first 10 objects
- if len(objects) == 60:
- break
- return objects
- class ObjectParser(threading.Thread):
- def __init__(self, objects_changed, objects_parsed, object_arrays):
- threading.Thread.__init__(self)
- self._objects_changed = objects_changed
- self._objects_parsed = objects_parsed
- self._object_arrays = object_arrays
- def run(self):
- global DETECTED_OBJECTS
- while True:
- detected_objects = []
- # wait until object detection has run
- # TODO: what if something else changed while I was processing???
- with self._objects_changed:
- self._objects_changed.wait()
- for object_array in self._object_arrays:
- object_index = 0
- while(object_index < 60 and object_array[object_index] > 0):
- object_class = object_array[object_index]
- detected_objects.append({
- 'name': str(category_index.get(object_class).get('name')),
- 'score': object_array[object_index+1],
- 'ymin': int(object_array[object_index+2]),
- 'xmin': int(object_array[object_index+3]),
- 'ymax': int(object_array[object_index+4]),
- 'xmax': int(object_array[object_index+5])
- })
- object_index += 6
- DETECTED_OBJECTS = detected_objects
- # notify that objects were parsed
- with self._objects_parsed:
- self._objects_parsed.notify_all()
- class MqttMotionPublisher(threading.Thread):
- def __init__(self, client, topic_prefix, motion_changed, motion_flags):
- threading.Thread.__init__(self)
- self.client = client
- self.topic_prefix = topic_prefix
- self.motion_changed = motion_changed
- self.motion_flags = motion_flags
- def run(self):
- last_sent_motion = ""
- while True:
- with self.motion_changed:
- self.motion_changed.wait()
-
- # send message for motion
- motion_status = 'OFF'
- if any(obj.is_set() for obj in self.motion_flags):
- motion_status = 'ON'
- if last_sent_motion != motion_status:
- last_sent_motion = motion_status
- self.client.publish(self.topic_prefix+'/motion', motion_status, retain=False)
- class MqttObjectPublisher(threading.Thread):
- def __init__(self, client, topic_prefix, objects_parsed, object_classes):
- threading.Thread.__init__(self)
- self.client = client
- self.topic_prefix = topic_prefix
- self.objects_parsed = objects_parsed
- self.object_classes = object_classes
- def run(self):
- global DETECTED_OBJECTS
- last_sent_payload = ""
- while True:
- # initialize the payload
- payload = {}
- for obj in self.object_classes:
- payload[obj] = []
- # wait until objects have been parsed
- with self.objects_parsed:
- self.objects_parsed.wait()
- # loop over detected objects and populate
- # the payload
- detected_objects = DETECTED_OBJECTS.copy()
- for obj in detected_objects:
- if obj['name'] in self.object_classes:
- payload[obj['name']].append(obj)
-
- # send message for objects if different
- new_payload = json.dumps(payload, sort_keys=True)
- if new_payload != last_sent_payload:
- last_sent_payload = new_payload
- self.client.publish(self.topic_prefix+'/objects', new_payload, retain=False)
- def main():
- # Parse selected regions
- regions = []
- for region_string in REGIONS.split(':'):
- region_parts = region_string.split(',')
- regions.append({
- 'size': int(region_parts[0]),
- 'x_offset': int(region_parts[1]),
- 'y_offset': int(region_parts[2]),
- 'min_object_size': int(region_parts[3]),
- # Event for motion detection signaling
- 'motion_detected': mp.Event(),
- # create shared array for storing 10 detected objects
- # note: this must be a double even though the value you are storing
- # is a float. otherwise it stops updating the value in shared
- # memory. probably something to do with the size of the memory block
- 'output_array': mp.Array(ctypes.c_double, 6*10)
- })
- # capture a single frame and check the frame shape so the correct array
- # size can be allocated in memory
- video = cv2.VideoCapture(RTSP_URL)
- ret, frame = video.read()
- if ret:
- frame_shape = frame.shape
- else:
- print("Unable to capture video stream")
- exit(1)
- video.release()
-
- # compute the flattened array length from the array shape
- flat_array_length = frame_shape[0] * frame_shape[1] * frame_shape[2]
- # create shared array for storing the full frame image data
- shared_arr = mp.Array(ctypes.c_uint16, flat_array_length)
- # create shared value for storing the frame_time
- shared_frame_time = mp.Value('d', 0.0)
- # Lock to control access to the frame while writing
- frame_lock = mp.Lock()
- # Condition for notifying that a new frame is ready
- frame_ready = mp.Condition()
- # Condition for notifying that motion status changed globally
- motion_changed = mp.Condition()
- # Condition for notifying that object detection ran
- objects_changed = mp.Condition()
- # Condition for notifying that objects were parsed
- objects_parsed = mp.Condition()
- # shape current frame so it can be treated as an image
- frame_arr = tonumpyarray(shared_arr).reshape(frame_shape)
- capture_process = mp.Process(target=fetch_frames, args=(shared_arr,
- shared_frame_time, frame_lock, frame_ready, frame_shape))
- capture_process.daemon = True
- detection_processes = []
- motion_processes = []
- for region in regions:
- detection_process = mp.Process(target=process_frames, args=(shared_arr,
- region['output_array'],
- shared_frame_time,
- frame_lock, frame_ready,
- region['motion_detected'],
- objects_changed,
- frame_shape,
- region['size'], region['x_offset'], region['y_offset']))
- detection_process.daemon = True
- detection_processes.append(detection_process)
- motion_process = mp.Process(target=detect_motion, args=(shared_arr,
- shared_frame_time,
- frame_lock, frame_ready,
- region['motion_detected'],
- motion_changed,
- frame_shape,
- region['size'], region['x_offset'], region['y_offset'],
- region['min_object_size'],
- True))
- motion_process.daemon = True
- motion_processes.append(motion_process)
- object_parser = ObjectParser(objects_changed, objects_parsed, [region['output_array'] for region in regions])
- object_parser.start()
- client = mqtt.Client()
- client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True)
- client.connect(MQTT_HOST, 1883, 60)
- client.loop_start()
- client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True)
- mqtt_publisher = MqttObjectPublisher(client, MQTT_TOPIC_PREFIX, objects_parsed,
- MQTT_OBJECT_CLASSES.split(','))
- mqtt_publisher.start()
- mqtt_motion_publisher = MqttMotionPublisher(client, MQTT_TOPIC_PREFIX, motion_changed,
- [region['motion_detected'] for region in regions])
- mqtt_motion_publisher.start()
- capture_process.start()
- print("capture_process pid ", capture_process.pid)
- for detection_process in detection_processes:
- detection_process.start()
- print("detection_process pid ", detection_process.pid)
- for motion_process in motion_processes:
- motion_process.start()
- print("motion_process pid ", motion_process.pid)
- app = Flask(__name__)
- @app.route('/')
- def index():
- # return a multipart response
- return Response(imagestream(),
- mimetype='multipart/x-mixed-replace; boundary=frame')
- def imagestream():
- global DETECTED_OBJECTS
- while True:
- # max out at 5 FPS
- time.sleep(0.2)
- # make a copy of the current detected objects
- detected_objects = DETECTED_OBJECTS.copy()
- # lock and make a copy of the current frame
- with frame_lock:
- frame = frame_arr.copy()
- # convert to RGB for drawing
- frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
- # draw the bounding boxes on the screen
- for obj in detected_objects:
- vis_util.draw_bounding_box_on_image_array(frame,
- obj['ymin'],
- obj['xmin'],
- obj['ymax'],
- obj['xmax'],
- color='red',
- thickness=2,
- display_str_list=["{}: {}%".format(obj['name'],int(obj['score']*100))],
- use_normalized_coordinates=False)
- for region in regions:
- color = (255,255,255)
- if region['motion_detected'].is_set():
- color = (0,255,0)
- cv2.rectangle(frame, (region['x_offset'], region['y_offset']),
- (region['x_offset']+region['size'], region['y_offset']+region['size']),
- color, 2)
- # convert back to BGR
- frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
- # encode the image into a jpg
- ret, jpg = cv2.imencode('.jpg', frame)
- yield (b'--frame\r\n'
- b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
- app.run(host='0.0.0.0', debug=False)
- capture_process.join()
- for detection_process in detection_processes:
- detection_process.join()
- for motion_process in motion_processes:
- motion_process.join()
- object_parser.join()
- mqtt_publisher.join()
- # convert shared memory array into numpy array
- def tonumpyarray(mp_arr):
- return np.frombuffer(mp_arr.get_obj(), dtype=np.uint16)
- # fetch the frames as fast a possible, only decoding the frames when the
- # detection_process has consumed the current frame
- def fetch_frames(shared_arr, shared_frame_time, frame_lock, frame_ready, frame_shape):
- # convert shared memory array into numpy and shape into image array
- arr = tonumpyarray(shared_arr).reshape(frame_shape)
- # start the video capture
- video = cv2.VideoCapture(RTSP_URL)
- # keep the buffer small so we minimize old data
- video.set(cv2.CAP_PROP_BUFFERSIZE,1)
- while True:
- # grab the frame, but dont decode it yet
- ret = video.grab()
- # snapshot the time the frame was grabbed
- frame_time = datetime.datetime.now()
- if ret:
- # go ahead and decode the current frame
- ret, frame = video.retrieve()
- if ret:
- # Lock access and update frame
- with frame_lock:
- arr[:] = frame
- shared_frame_time.value = frame_time.timestamp()
- # Notify with the condition that a new frame is ready
- with frame_ready:
- frame_ready.notify_all()
-
- video.release()
- # do the actual object detection
- def process_frames(shared_arr, shared_output_arr, shared_frame_time, frame_lock, frame_ready,
- motion_detected, objects_changed, frame_shape, region_size, region_x_offset, region_y_offset):
- debug = True
- # shape shared input array into frame for processing
- arr = tonumpyarray(shared_arr).reshape(frame_shape)
- # Load a (frozen) Tensorflow model into memory before the processing loop
- detection_graph = tf.Graph()
- with detection_graph.as_default():
- od_graph_def = tf.GraphDef()
- with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:
- serialized_graph = fid.read()
- od_graph_def.ParseFromString(serialized_graph)
- tf.import_graph_def(od_graph_def, name='')
- sess = tf.Session(graph=detection_graph)
- frame_time = 0.0
- while True:
- now = datetime.datetime.now().timestamp()
- # wait until motion is detected
- motion_detected.wait()
- with frame_ready:
- # if there isnt a frame ready for processing or it is old, wait for a signal
- if shared_frame_time.value == frame_time or (now - shared_frame_time.value) > 0.5:
- frame_ready.wait()
-
- # make a copy of the cropped frame
- with frame_lock:
- cropped_frame = arr[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy()
- frame_time = shared_frame_time.value
- # convert to RGB
- cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB)
- # do the object detection
- objects = detect_objects(cropped_frame_rgb, sess, detection_graph, region_size, region_x_offset, region_y_offset, True)
- # copy the detected objects to the output array, filling the array when needed
- shared_output_arr[:] = objects + [0.0] * (60-len(objects))
- with objects_changed:
- objects_changed.notify_all()
- # do the actual motion detection
- def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion_detected, motion_changed,
- frame_shape, region_size, region_x_offset, region_y_offset, min_motion_area, debug):
- # shape shared input array into frame for processing
- arr = tonumpyarray(shared_arr).reshape(frame_shape)
- avg_frame = None
- last_motion = -1
- frame_time = 0.0
- motion_frames = 0
- while True:
- now = datetime.datetime.now().timestamp()
- # if it has been long enough since the last motion, clear the flag
- if last_motion > 0 and (now - last_motion) > 2:
- last_motion = -1
- motion_detected.clear()
- with motion_changed:
- motion_changed.notify_all()
-
- with frame_ready:
- # if there isnt a frame ready for processing or it is old, wait for a signal
- if shared_frame_time.value == frame_time or (now - shared_frame_time.value) > 0.5:
- frame_ready.wait()
-
- # lock and make a copy of the cropped frame
- with frame_lock:
- cropped_frame = arr[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy().astype('uint8')
- frame_time = shared_frame_time.value
- # convert to grayscale
- gray = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2GRAY)
- # apply gaussian blur
- gray = cv2.GaussianBlur(gray, (21, 21), 0)
- if avg_frame is None:
- avg_frame = gray.copy().astype("float")
- continue
-
- # look at the delta from the avg_frame
- cv2.accumulateWeighted(gray, avg_frame, 0.5)
- frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(avg_frame))
- thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1]
-
- # dilate the thresholded image to fill in holes, then find contours
- # on thresholded image
- thresh = cv2.dilate(thresh, None, iterations=2)
- cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL,
- cv2.CHAIN_APPROX_SIMPLE)
- cnts = imutils.grab_contours(cnts)
- # if there are no contours, there is no motion
- if len(cnts) < 1:
- motion_frames = 0
- continue
- motion_found = False
- # loop over the contours
- for c in cnts:
- # if the contour is big enough, count it as motion
- contour_area = cv2.contourArea(c)
- if contour_area > min_motion_area:
- motion_found = True
- if debug:
- cv2.drawContours(cropped_frame, [c], -1, (0, 255, 0), 2)
- x, y, w, h = cv2.boundingRect(c)
- cv2.putText(cropped_frame, str(contour_area), (x, y),
- cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 0), 2)
- else:
- break
-
- if motion_found:
- motion_frames += 1
- # if there have been enough consecutive motion frames, report motion
- if motion_frames >= 3:
- motion_detected.set()
- with motion_changed:
- motion_changed.notify_all()
- last_motion = now
- else:
- motion_frames = 0
- if debug and motion_frames > 0:
- cv2.imwrite("/lab/debug/motion-{}-{}-{}.jpg".format(region_x_offset, region_y_offset, datetime.datetime.now().timestamp()), cropped_frame)
- if __name__ == '__main__':
- mp.freeze_support()
- main()
|