123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619 |
- import datetime
- import time
- import threading
- import queue
- import itertools
- from collections import defaultdict
- import cv2
- import imutils
- import numpy as np
- from scipy.spatial import distance as dist
- import tflite_runtime.interpreter as tflite
- from tflite_runtime.interpreter import load_delegate
- def load_labels(path, encoding='utf-8'):
- """Loads labels from file (with or without index numbers).
- Args:
- path: path to label file.
- encoding: label file encoding.
- Returns:
- Dictionary mapping indices to labels.
- """
- with open(path, 'r', encoding=encoding) as f:
- lines = f.readlines()
- if not lines:
- return {}
- if lines[0].split(' ', maxsplit=1)[0].isdigit():
- pairs = [line.split(' ', maxsplit=1) for line in lines]
- return {int(index): label.strip() for index, label in pairs}
- else:
- return {index: line.strip() for index, line in enumerate(lines)}
- def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'):
- if color is None:
- color = (0,0,255)
- display_text = "{}: {}".format(label, info)
- cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, thickness)
- font_scale = 0.5
- font = cv2.FONT_HERSHEY_SIMPLEX
- # get the width and height of the text box
- size = cv2.getTextSize(display_text, font, fontScale=font_scale, thickness=2)
- text_width = size[0][0]
- text_height = size[0][1]
- line_height = text_height + size[1]
- # set the text start position
- if position == 'ul':
- text_offset_x = x_min
- text_offset_y = 0 if y_min < line_height else y_min - (line_height+8)
- elif position == 'ur':
- text_offset_x = x_max - (text_width+8)
- text_offset_y = 0 if y_min < line_height else y_min - (line_height+8)
- elif position == 'bl':
- text_offset_x = x_min
- text_offset_y = y_max
- elif position == 'br':
- text_offset_x = x_max - (text_width+8)
- text_offset_y = y_max
- # make the coords of the box with a small padding of two pixels
- textbox_coords = ((text_offset_x, text_offset_y), (text_offset_x + text_width + 2, text_offset_y + line_height))
- cv2.rectangle(frame, textbox_coords[0], textbox_coords[1], color, cv2.FILLED)
- cv2.putText(frame, display_text, (text_offset_x, text_offset_y + line_height - 3), font, fontScale=font_scale, color=(0, 0, 0), thickness=2)
- def calculate_region(frame_shape, xmin, ymin, xmax, ymax, multiplier=2):
- # size is larger than longest edge
- size = int(max(xmax-xmin, ymax-ymin)*multiplier)
- # if the size is too big to fit in the frame
- if size > min(frame_shape[0], frame_shape[1]):
- size = min(frame_shape[0], frame_shape[1])
- # x_offset is midpoint of bounding box minus half the size
- x_offset = int((xmax-xmin)/2.0+xmin-size/2.0)
- # if outside the image
- if x_offset < 0:
- x_offset = 0
- elif x_offset > (frame_shape[1]-size):
- x_offset = (frame_shape[1]-size)
- # y_offset is midpoint of bounding box minus half the size
- y_offset = int((ymax-ymin)/2.0+ymin-size/2.0)
- # if outside the image
- if y_offset < 0:
- y_offset = 0
- elif y_offset > (frame_shape[0]-size):
- y_offset = (frame_shape[0]-size)
- return (x_offset, y_offset, x_offset+size, y_offset+size)
- def intersection(box_a, box_b):
- return (
- max(box_a[0], box_b[0]),
- max(box_a[1], box_b[1]),
- min(box_a[2], box_b[2]),
- min(box_a[3], box_b[3])
- )
- def area(box):
- return (box[2]-box[0] + 1)*(box[3]-box[1] + 1)
-
- def intersection_over_union(box_a, box_b):
- # determine the (x, y)-coordinates of the intersection rectangle
- intersect = intersection(box_a, box_b)
- # compute the area of intersection rectangle
- inter_area = max(0, intersect[2] - intersect[0] + 1) * max(0, intersect[3] - intersect[1] + 1)
- if inter_area == 0:
- return 0.0
-
- # compute the area of both the prediction and ground-truth
- # rectangles
- box_a_area = (box_a[2] - box_a[0] + 1) * (box_a[3] - box_a[1] + 1)
- box_b_area = (box_b[2] - box_b[0] + 1) * (box_b[3] - box_b[1] + 1)
- # compute the intersection over union by taking the intersection
- # area and dividing it by the sum of prediction + ground-truth
- # areas - the interesection area
- iou = inter_area / float(box_a_area + box_b_area - inter_area)
- # return the intersection over union value
- return iou
- def clipped(obj, frame_shape):
- # if the object is within 5 pixels of the region border, and the region is not on the edge
- # consider the object to be clipped
- box = obj[2]
- region = obj[3]
- if ((region[0] > 5 and box[0]-region[0] <= 5) or
- (region[1] > 5 and box[1]-region[1] <= 5) or
- (frame_shape[1]-region[2] > 5 and region[2]-box[2] <= 5) or
- (frame_shape[0]-region[3] > 5 and region[3]-box[3] <= 5)):
- return True
- else:
- return False
- def filtered(obj):
- if obj[0] != 'person':
- return True
- return False
- def create_tensor_input(frame, region):
- cropped_frame = frame[region[1]:region[3], region[0]:region[2]]
- # Resize to 300x300 if needed
- if cropped_frame.shape != (300, 300, 3):
- # TODO: use Pillow-SIMD?
- cropped_frame = cv2.resize(cropped_frame, dsize=(300, 300), interpolation=cv2.INTER_LINEAR)
-
- # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
- return np.expand_dims(cropped_frame, axis=0)
- class MotionDetector():
- # TODO: add motion masking
- def __init__(self, frame_shape, resize_factor=4):
- self.resize_factor = resize_factor
- self.motion_frame_size = (int(frame_shape[0]/resize_factor), int(frame_shape[1]/resize_factor))
- self.avg_frame = np.zeros(self.motion_frame_size, np.float)
- self.avg_delta = np.zeros(self.motion_frame_size, np.float)
- self.motion_frame_count = 0
- self.frame_counter = 0
- def detect(self, frame):
- motion_boxes = []
- # resize frame
- resized_frame = cv2.resize(frame, dsize=(self.motion_frame_size[1], self.motion_frame_size[0]), interpolation=cv2.INTER_LINEAR)
- # convert to grayscale
- gray = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2GRAY)
- # it takes ~30 frames to establish a baseline
- # dont bother looking for motion
- if self.frame_counter < 30:
- self.frame_counter += 1
- else:
- # compare to average
- frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(self.avg_frame))
- # compute the average delta over the past few frames
- # the alpha value can be modified to configure how sensitive the motion detection is.
- # higher values mean the current frame impacts the delta a lot, and a single raindrop may
- # register as motion, too low and a fast moving person wont be detected as motion
- # this also assumes that a person is in the same location across more than a single frame
- cv2.accumulateWeighted(frameDelta, self.avg_delta, 0.2)
- # compute the threshold image for the current frame
- current_thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1]
- # black out everything in the avg_delta where there isnt motion in the current frame
- avg_delta_image = cv2.convertScaleAbs(self.avg_delta)
- avg_delta_image[np.where(current_thresh==[0])] = [0]
- # then look for deltas above the threshold, but only in areas where there is a delta
- # in the current frame. this prevents deltas from previous frames from being included
- thresh = cv2.threshold(avg_delta_image, 25, 255, cv2.THRESH_BINARY)[1]
- # dilate the thresholded image to fill in holes, then find contours
- # on thresholded image
- thresh = cv2.dilate(thresh, None, iterations=2)
- cnts = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
- cnts = imutils.grab_contours(cnts)
- # loop over the contours
- for c in cnts:
- # if the contour is big enough, count it as motion
- contour_area = cv2.contourArea(c)
- if contour_area > 100:
- # cv2.drawContours(resized_frame, [c], -1, (255,255,255), 2)
- x, y, w, h = cv2.boundingRect(c)
- motion_boxes.append((x*self.resize_factor, y*self.resize_factor, (x+w)*self.resize_factor, (y+h)*self.resize_factor))
-
- if len(motion_boxes) > 0:
- self.motion_frame_count += 1
- # TODO: this really depends on FPS
- if self.motion_frame_count >= 10:
- # only average in the current frame if the difference persists for at least 3 frames
- cv2.accumulateWeighted(gray, self.avg_frame, 0.2)
- else:
- # when no motion, just keep averaging the frames together
- cv2.accumulateWeighted(gray, self.avg_frame, 0.2)
- self.motion_frame_count = 0
- return motion_boxes
- class ObjectDetector():
- def __init__(self, model_file, label_file):
- self.labels = load_labels(label_file)
- edge_tpu_delegate = None
- try:
- edge_tpu_delegate = load_delegate('libedgetpu.so.1.0')
- except ValueError:
- print("No EdgeTPU detected. Falling back to CPU.")
-
- if edge_tpu_delegate is None:
- self.interpreter = tflite.Interpreter(
- model_path=model_file)
- else:
- self.interpreter = tflite.Interpreter(
- model_path=model_file,
- experimental_delegates=[edge_tpu_delegate])
-
- self.interpreter.allocate_tensors()
- self.tensor_input_details = self.interpreter.get_input_details()
- self.tensor_output_details = self.interpreter.get_output_details()
- def detect(self, tensor_input, threshold=.4):
- self.interpreter.set_tensor(self.tensor_input_details[0]['index'], tensor_input)
- self.interpreter.invoke()
- boxes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[0]['index']))
- label_codes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[1]['index']))
- scores = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[2]['index']))
- detections = []
- for i, score in enumerate(scores):
- label = self.labels[int(label_codes[i])]
-
- if score < threshold:
- break
- detections.append((
- label,
- float(score),
- boxes[i]
- ))
-
- return detections
- class ObjectTracker():
- def __init__(self, max_disappeared):
- self.tracked_objects = {}
- self.disappeared = {}
- self.max_disappeared = max_disappeared
- def register(self, index, frame_time, obj):
- id = f"{frame_time}-{index}"
- obj['id'] = id
- obj['frame_time'] = frame_time
- obj['top_score'] = obj['score']
- self.add_history(obj)
- self.tracked_objects[id] = obj
- self.disappeared[id] = 0
- def deregister(self, id):
- del self.tracked_objects[id]
- del self.disappeared[id]
-
- def update(self, id, new_obj):
- self.disappeared[id] = 0
- self.tracked_objects[id].update(new_obj)
- self.add_history(self.tracked_objects[id])
- if self.tracked_objects[id]['score'] > self.tracked_objects[id]['top_score']:
- self.tracked_objects[id]['top_score'] = self.tracked_objects[id]['score']
-
- def add_history(self, obj):
- entry = {
- 'score': obj['score'],
- 'box': obj['box'],
- 'region': obj['region'],
- 'centroid': obj['centroid'],
- 'frame_time': obj['frame_time']
- }
- if 'history' in obj:
- obj['history'].append(entry)
- else:
- obj['history'] = [entry]
- def match_and_update(self, frame_time, new_objects):
- if len(new_objects) == 0:
- for id in list(self.tracked_objects.keys()):
- if self.disappeared[id] >= self.max_disappeared:
- self.deregister(id)
- else:
- self.disappeared[id] += 1
- return
-
- # group by name
- new_object_groups = defaultdict(lambda: [])
- for obj in new_objects:
- new_object_groups[obj[0]].append({
- 'label': obj[0],
- 'score': obj[1],
- 'box': obj[2],
- 'region': obj[3]
- })
-
- # track objects for each label type
- for label, group in new_object_groups.items():
- current_objects = [o for o in self.tracked_objects.values() if o['label'] == label]
- current_ids = [o['id'] for o in current_objects]
- current_centroids = np.array([o['centroid'] for o in current_objects])
- # compute centroids of new objects
- for obj in group:
- centroid_x = int((obj['box'][0]+obj['box'][2]) / 2.0)
- centroid_y = int((obj['box'][1]+obj['box'][3]) / 2.0)
- obj['centroid'] = (centroid_x, centroid_y)
- if len(current_objects) == 0:
- for index, obj in enumerate(group):
- self.register(index, frame_time, obj)
- return
-
- new_centroids = np.array([o['centroid'] for o in group])
- # compute the distance between each pair of tracked
- # centroids and new centroids, respectively -- our
- # goal will be to match each new centroid to an existing
- # object centroid
- D = dist.cdist(current_centroids, new_centroids)
- # in order to perform this matching we must (1) find the
- # smallest value in each row and then (2) sort the row
- # indexes based on their minimum values so that the row
- # with the smallest value is at the *front* of the index
- # list
- rows = D.min(axis=1).argsort()
- # next, we perform a similar process on the columns by
- # finding the smallest value in each column and then
- # sorting using the previously computed row index list
- cols = D.argmin(axis=1)[rows]
- # in order to determine if we need to update, register,
- # or deregister an object we need to keep track of which
- # of the rows and column indexes we have already examined
- usedRows = set()
- usedCols = set()
- # loop over the combination of the (row, column) index
- # tuples
- for (row, col) in zip(rows, cols):
- # if we have already examined either the row or
- # column value before, ignore it
- if row in usedRows or col in usedCols:
- continue
- # otherwise, grab the object ID for the current row,
- # set its new centroid, and reset the disappeared
- # counter
- objectID = current_ids[row]
- self.update(objectID, group[col])
- # indicate that we have examined each of the row and
- # column indexes, respectively
- usedRows.add(row)
- usedCols.add(col)
- # compute the column index we have NOT yet examined
- unusedRows = set(range(0, D.shape[0])).difference(usedRows)
- unusedCols = set(range(0, D.shape[1])).difference(usedCols)
- # in the event that the number of object centroids is
- # equal or greater than the number of input centroids
- # we need to check and see if some of these objects have
- # potentially disappeared
- if D.shape[0] >= D.shape[1]:
- for row in unusedRows:
- id = current_ids[row]
- if self.disappeared[id] >= self.max_disappeared:
- self.deregister(id)
- else:
- self.disappeared[id] += 1
- # if the number of input centroids is greater
- # than the number of existing object centroids we need to
- # register each new input centroid as a trackable object
- else:
- for col in unusedCols:
- self.register(col, frame_time, group[col])
- def main():
- frames = 0
- # frame_queue = queue.Queue(maxsize=5)
- # frame_cache = {}
- # frame_shape = (1080,1920,3)
- frame_shape = (720,1280,3)
- frame_size = frame_shape[0]*frame_shape[1]*frame_shape[2]
- frame = np.zeros(frame_shape, np.uint8)
- motion_detector = MotionDetector(frame_shape, resize_factor=4)
- object_detector = ObjectDetector('/lab/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite', '/lab/labelmap.txt')
- # object_detector = ObjectDetector('/lab/detect.tflite', '/lab/labelmap.txt')
- object_tracker = ObjectTracker(10)
- # f = open('/debug/input/back.rgb24', 'rb')
- f = open('/debug/back.raw_video', 'rb')
- # f = open('/debug/ali-jake.raw_video', 'rb')
- total_detections = 0
- start = datetime.datetime.now().timestamp()
- while True:
- frame_detections = 0
- frame_bytes = f.read(frame_size)
- if not frame_bytes:
- break
- frame_time = datetime.datetime.now().timestamp()
-
- # Store frame in numpy array
- frame[:] = (np
- .frombuffer(frame_bytes, np.uint8)
- .reshape(frame_shape))
- frames += 1
-
- # look for motion
- motion_boxes = motion_detector.detect(frame)
- tracked_objects = object_tracker.tracked_objects.values()
- # merge areas of motion that intersect with a known tracked object into a single area to look at
- areas_of_interest = []
- used_motion_boxes = []
- for obj in tracked_objects:
- x_min, y_min, x_max, y_max = obj['box']
- for m_index, motion_box in enumerate(motion_boxes):
- if area(intersection(obj['box'], motion_box))/area(motion_box) > .5:
- used_motion_boxes.append(m_index)
- x_min = min(obj['box'][0], motion_box[0])
- y_min = min(obj['box'][1], motion_box[1])
- x_max = max(obj['box'][2], motion_box[2])
- y_max = max(obj['box'][3], motion_box[3])
- areas_of_interest.append((x_min, y_min, x_max, y_max))
- unused_motion_boxes = set(range(0, len(motion_boxes))).difference(used_motion_boxes)
-
- # compute motion regions
- motion_regions = [calculate_region(frame_shape, motion_boxes[i][0], motion_boxes[i][1], motion_boxes[i][2], motion_boxes[i][3], 1.2)
- for i in unused_motion_boxes]
-
- # compute tracked object regions
- object_regions = [calculate_region(frame_shape, a[0], a[1], a[2], a[3], 1.2)
- for a in areas_of_interest]
-
- # merge regions with high IOU
- merged_regions = motion_regions+object_regions
- while True:
- max_iou = 0.0
- max_indices = None
- region_indices = range(len(merged_regions))
- for a, b in itertools.combinations(region_indices, 2):
- iou = intersection_over_union(merged_regions[a], merged_regions[b])
- if iou > max_iou:
- max_iou = iou
- max_indices = (a, b)
- if max_iou > 0.1:
- a = merged_regions[max_indices[0]]
- b = merged_regions[max_indices[1]]
- merged_regions.append(calculate_region(frame_shape,
- min(a[0], b[0]),
- min(a[1], b[1]),
- max(a[2], b[2]),
- max(a[3], b[3]),
- 1
- ))
- del merged_regions[max(max_indices[0], max_indices[1])]
- del merged_regions[min(max_indices[0], max_indices[1])]
- else:
- break
- # resize regions and detect
- detections = []
- for region in merged_regions:
- tensor_input = create_tensor_input(frame, region)
- region_detections = object_detector.detect(tensor_input)
- frame_detections += 1
- for d in region_detections:
- if filtered(d):
- continue
- box = d[2]
- size = region[2]-region[0]
- x_min = int((box[1] * size) + region[0])
- y_min = int((box[0] * size) + region[1])
- x_max = int((box[3] * size) + region[0])
- y_max = int((box[2] * size) + region[1])
- detections.append((
- d[0],
- d[1],
- (x_min, y_min, x_max, y_max),
- region))
- #########
- # merge objects, check for clipped objects and look again up to N times
- #########
- refining = True
- refine_count = 0
- while refining and refine_count < 4:
- refining = False
- # group by name
- detected_object_groups = defaultdict(lambda: [])
- for detection in detections:
- detected_object_groups[detection[0]].append(detection)
- selected_objects = []
- for group in detected_object_groups.values():
- # apply non-maxima suppression to suppress weak, overlapping bounding boxes
- boxes = [(o[2][0], o[2][1], o[2][2]-o[2][0], o[2][3]-o[2][1])
- for o in group]
- confidences = [o[1] for o in group]
- idxs = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)
- for index in idxs:
- obj = group[index[0]]
- if clipped(obj, frame_shape): #obj['clipped']:
- box = obj[2]
- # calculate a new region that will hopefully get the entire object
- region = calculate_region(frame_shape,
- box[0], box[1],
- box[2], box[3])
-
- tensor_input = create_tensor_input(frame, region)
- # run detection on new region
- refined_detections = object_detector.detect(tensor_input)
- frame_detections += 1
- for d in refined_detections:
- if filtered(d):
- continue
- box = d[2]
- size = region[2]-region[0]
- x_min = int((box[1] * size) + region[0])
- y_min = int((box[0] * size) + region[1])
- x_max = int((box[3] * size) + region[0])
- y_max = int((box[2] * size) + region[1])
- selected_objects.append((
- d[0],
- d[1],
- (x_min, y_min, x_max, y_max),
- region))
- refining = True
- else:
- selected_objects.append(obj)
-
- # set the detections list to only include top, complete objects
- # and new detections
- detections = selected_objects
- if refining:
- refine_count += 1
-
- # now that we have refined our detections, we need to track objects
- object_tracker.match_and_update(frame_time, detections)
- total_detections += frame_detections
- # if (frames >= 700 and frames <= 1635) or (frames >= 2500):
- # if (frames >= 700 and frames <= 1000):
- # if (frames >= 0):
- # # row1 = cv2.hconcat([gray, cv2.convertScaleAbs(avg_frame)])
- # # row2 = cv2.hconcat([frameDelta, thresh])
- # # cv2.imwrite(f"/lab/debug/output/{frames}.jpg", cv2.vconcat([row1, row2]))
- # # # cv2.imwrite(f"/lab/debug/output/resized-frame-{frames}.jpg", resized_frame)
- # # for region in motion_regions:
- # # cv2.rectangle(frame, (region[0], region[1]), (region[2], region[3]), (255,128,0), 2)
- # # for region in object_regions:
- # # cv2.rectangle(frame, (region[0], region[1]), (region[2], region[3]), (0,128,255), 2)
- # for region in merged_regions:
- # cv2.rectangle(frame, (region[0], region[1]), (region[2], region[3]), (0,255,0), 2)
- # for box in motion_boxes:
- # cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), (255,0,0), 2)
- # for detection in detections:
- # box = detection[2]
- # draw_box_with_label(frame, box[0], box[1], box[2], box[3], detection[0], f"{detection[1]*100}%")
- # for obj in object_tracker.tracked_objects.values():
- # box = obj['box']
- # draw_box_with_label(frame, box[0], box[1], box[2], box[3], obj['label'], obj['id'], thickness=1, color=(0,0,255), position='bl')
- # cv2.putText(frame, str(total_detections), (10, 10), cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.5, color=(0, 0, 0), thickness=2)
- # cv2.putText(frame, str(frame_detections), (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.5, color=(0, 0, 0), thickness=2)
- # cv2.imwrite(f"/lab/debug/output/frame-{frames}.jpg", frame)
- # break
- duration = datetime.datetime.now().timestamp()-start
- print(f"Processed {frames} frames for {duration:.2f} seconds and {(frames/duration):.2f} FPS.")
- print(f"Total detections: {total_detections}")
- if __name__ == '__main__':
- main()
|