123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517 |
- import collections
- import datetime
- import hashlib
- import json
- import logging
- import math
- import signal
- import subprocess as sp
- import threading
- import time
- import traceback
- from abc import ABC, abstractmethod
- from multiprocessing import shared_memory
- from typing import AnyStr
- import cv2
- import matplotlib.pyplot as plt
- import numpy as np
- logger = logging.getLogger(__name__)
- def draw_box_with_label(
- frame,
- x_min,
- y_min,
- x_max,
- y_max,
- label,
- info,
- thickness=2,
- color=None,
- position="ul",
- ):
- if color is None:
- color = (0, 0, 255)
- display_text = "{}: {}".format(label, info)
- cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, thickness)
- font_scale = 0.5
- font = cv2.FONT_HERSHEY_SIMPLEX
- # get the width and height of the text box
- size = cv2.getTextSize(display_text, font, fontScale=font_scale, thickness=2)
- text_width = size[0][0]
- text_height = size[0][1]
- line_height = text_height + size[1]
- # set the text start position
- if position == "ul":
- text_offset_x = x_min
- text_offset_y = 0 if y_min < line_height else y_min - (line_height + 8)
- elif position == "ur":
- text_offset_x = x_max - (text_width + 8)
- text_offset_y = 0 if y_min < line_height else y_min - (line_height + 8)
- elif position == "bl":
- text_offset_x = x_min
- text_offset_y = y_max
- elif position == "br":
- text_offset_x = x_max - (text_width + 8)
- text_offset_y = y_max
- # make the coords of the box with a small padding of two pixels
- textbox_coords = (
- (text_offset_x, text_offset_y),
- (text_offset_x + text_width + 2, text_offset_y + line_height),
- )
- cv2.rectangle(frame, textbox_coords[0], textbox_coords[1], color, cv2.FILLED)
- cv2.putText(
- frame,
- display_text,
- (text_offset_x, text_offset_y + line_height - 3),
- font,
- fontScale=font_scale,
- color=(0, 0, 0),
- thickness=2,
- )
- def calculate_region(frame_shape, xmin, ymin, xmax, ymax, multiplier=2):
- # size is the longest edge and divisible by 4
- size = int(max(xmax - xmin, ymax - ymin) // 4 * 4 * multiplier)
- # dont go any smaller than 300
- if size < 300:
- size = 300
- # x_offset is midpoint of bounding box minus half the size
- x_offset = int((xmax - xmin) / 2.0 + xmin - size / 2.0)
- # if outside the image
- if x_offset < 0:
- x_offset = 0
- elif x_offset > (frame_shape[1] - size):
- x_offset = max(0, (frame_shape[1] - size))
- # y_offset is midpoint of bounding box minus half the size
- y_offset = int((ymax - ymin) / 2.0 + ymin - size / 2.0)
- # # if outside the image
- if y_offset < 0:
- y_offset = 0
- elif y_offset > (frame_shape[0] - size):
- y_offset = max(0, (frame_shape[0] - size))
- return (x_offset, y_offset, x_offset + size, y_offset + size)
- def get_yuv_crop(frame_shape, crop):
- # crop should be (x1,y1,x2,y2)
- frame_height = frame_shape[0] // 3 * 2
- frame_width = frame_shape[1]
- # compute the width/height of the uv channels
- uv_width = frame_width // 2 # width of the uv channels
- uv_height = frame_height // 4 # height of the uv channels
- # compute the offset for upper left corner of the uv channels
- uv_x_offset = crop[0] // 2 # x offset of the uv channels
- uv_y_offset = crop[1] // 4 # y offset of the uv channels
- # compute the width/height of the uv crops
- uv_crop_width = (crop[2] - crop[0]) // 2 # width of the cropped uv channels
- uv_crop_height = (crop[3] - crop[1]) // 4 # height of the cropped uv channels
- # ensure crop dimensions are multiples of 2 and 4
- y = (crop[0], crop[1], crop[0] + uv_crop_width * 2, crop[1] + uv_crop_height * 4)
- u1 = (
- 0 + uv_x_offset,
- frame_height + uv_y_offset,
- 0 + uv_x_offset + uv_crop_width,
- frame_height + uv_y_offset + uv_crop_height,
- )
- u2 = (
- uv_width + uv_x_offset,
- frame_height + uv_y_offset,
- uv_width + uv_x_offset + uv_crop_width,
- frame_height + uv_y_offset + uv_crop_height,
- )
- v1 = (
- 0 + uv_x_offset,
- frame_height + uv_height + uv_y_offset,
- 0 + uv_x_offset + uv_crop_width,
- frame_height + uv_height + uv_y_offset + uv_crop_height,
- )
- v2 = (
- uv_width + uv_x_offset,
- frame_height + uv_height + uv_y_offset,
- uv_width + uv_x_offset + uv_crop_width,
- frame_height + uv_height + uv_y_offset + uv_crop_height,
- )
- return y, u1, u2, v1, v2
- def yuv_crop_and_resize(frame, region, height=None):
- # Crops and resizes a YUV frame while maintaining aspect ratio
- # https://stackoverflow.com/a/57022634
- height = frame.shape[0] // 3 * 2
- width = frame.shape[1]
- # get the crop box if the region extends beyond the frame
- crop_x1 = max(0, region[0])
- crop_y1 = max(0, region[1])
- # ensure these are a multiple of 4
- crop_x2 = min(width, region[2])
- crop_y2 = min(height, region[3])
- crop_box = (crop_x1, crop_y1, crop_x2, crop_y2)
- y, u1, u2, v1, v2 = get_yuv_crop(frame.shape, crop_box)
- # if the region starts outside the frame, indent the start point in the cropped frame
- y_channel_x_offset = abs(min(0, region[0]))
- y_channel_y_offset = abs(min(0, region[1]))
- uv_channel_x_offset = y_channel_x_offset // 2
- uv_channel_y_offset = y_channel_y_offset // 4
- # create the yuv region frame
- # make sure the size is a multiple of 4
- # TODO: this should be based on the size after resize now
- size = (region[3] - region[1]) // 4 * 4
- yuv_cropped_frame = np.zeros((size + size // 2, size), np.uint8)
- # fill in black
- yuv_cropped_frame[:] = 128
- yuv_cropped_frame[0:size, 0:size] = 16
- # copy the y channel
- yuv_cropped_frame[
- y_channel_y_offset : y_channel_y_offset + y[3] - y[1],
- y_channel_x_offset : y_channel_x_offset + y[2] - y[0],
- ] = frame[y[1] : y[3], y[0] : y[2]]
- uv_crop_width = u1[2] - u1[0]
- uv_crop_height = u1[3] - u1[1]
- # copy u1
- yuv_cropped_frame[
- size + uv_channel_y_offset : size + uv_channel_y_offset + uv_crop_height,
- 0 + uv_channel_x_offset : 0 + uv_channel_x_offset + uv_crop_width,
- ] = frame[u1[1] : u1[3], u1[0] : u1[2]]
- # copy u2
- yuv_cropped_frame[
- size + uv_channel_y_offset : size + uv_channel_y_offset + uv_crop_height,
- size // 2
- + uv_channel_x_offset : size // 2
- + uv_channel_x_offset
- + uv_crop_width,
- ] = frame[u2[1] : u2[3], u2[0] : u2[2]]
- # copy v1
- yuv_cropped_frame[
- size
- + size // 4
- + uv_channel_y_offset : size
- + size // 4
- + uv_channel_y_offset
- + uv_crop_height,
- 0 + uv_channel_x_offset : 0 + uv_channel_x_offset + uv_crop_width,
- ] = frame[v1[1] : v1[3], v1[0] : v1[2]]
- # copy v2
- yuv_cropped_frame[
- size
- + size // 4
- + uv_channel_y_offset : size
- + size // 4
- + uv_channel_y_offset
- + uv_crop_height,
- size // 2
- + uv_channel_x_offset : size // 2
- + uv_channel_x_offset
- + uv_crop_width,
- ] = frame[v2[1] : v2[3], v2[0] : v2[2]]
- return yuv_cropped_frame
- def copy_yuv_to_position(
- position,
- destination_frame,
- destination_dim,
- source_frame=None,
- source_channel_dim=None,
- ):
- # TODO: consider calculating this on layout reflow instead of all the time
- layout_shape = (
- (destination_frame.shape[0] // 3 * 2) // destination_dim,
- destination_frame.shape[1] // destination_dim,
- )
- # calculate the x and y offset for the frame in the layout
- y_offset = layout_shape[0] * math.floor(position / destination_dim)
- x_offset = layout_shape[1] * (position % destination_dim)
- # get the coordinates of the channels for this position in the layout
- y, u1, u2, v1, v2 = get_yuv_crop(
- destination_frame.shape,
- (
- x_offset,
- y_offset,
- x_offset + layout_shape[1],
- y_offset + layout_shape[0],
- ),
- )
- if source_frame is None:
- # clear y
- destination_frame[
- y[1] : y[3],
- y[0] : y[2],
- ] = 16
- # clear u1
- destination_frame[u1[1] : u1[3], u1[0] : u1[2]] = 128
- # clear u2
- destination_frame[u2[1] : u2[3], u2[0] : u2[2]] = 128
- # clear v1
- destination_frame[v1[1] : v1[3], v1[0] : v1[2]] = 128
- # clear v2
- destination_frame[v2[1] : v2[3], v2[0] : v2[2]] = 128
- else:
- interpolation = cv2.INTER_AREA
- # resize/copy y channel
- destination_frame[y[1] : y[3], y[0] : y[2]] = cv2.resize(
- source_frame[
- source_channel_dim["y"][1] : source_channel_dim["y"][3],
- source_channel_dim["y"][0] : source_channel_dim["y"][2],
- ],
- dsize=(y[2] - y[0], y[3] - y[1]),
- interpolation=interpolation,
- )
- # resize/copy u1
- destination_frame[u1[1] : u1[3], u1[0] : u1[2]] = cv2.resize(
- source_frame[
- source_channel_dim["u1"][1] : source_channel_dim["u1"][3],
- source_channel_dim["u1"][0] : source_channel_dim["u1"][2],
- ],
- dsize=(u1[2] - u1[0], u1[3] - u1[1]),
- interpolation=interpolation,
- )
- # resize/copy u2
- destination_frame[u2[1] : u2[3], u2[0] : u2[2]] = cv2.resize(
- source_frame[
- source_channel_dim["u2"][1] : source_channel_dim["u2"][3],
- source_channel_dim["u2"][0] : source_channel_dim["u2"][2],
- ],
- dsize=(u2[2] - u2[0], u2[3] - u2[1]),
- interpolation=interpolation,
- )
- # resize/copy v1
- destination_frame[v1[1] : v1[3], v1[0] : v1[2]] = cv2.resize(
- source_frame[
- source_channel_dim["v1"][1] : source_channel_dim["v1"][3],
- source_channel_dim["v1"][0] : source_channel_dim["v1"][2],
- ],
- dsize=(v1[2] - v1[0], v1[3] - v1[1]),
- interpolation=interpolation,
- )
- # resize/copy v2
- destination_frame[v2[1] : v2[3], v2[0] : v2[2]] = cv2.resize(
- source_frame[
- source_channel_dim["v2"][1] : source_channel_dim["v2"][3],
- source_channel_dim["v2"][0] : source_channel_dim["v2"][2],
- ],
- dsize=(v2[2] - v2[0], v2[3] - v2[1]),
- interpolation=interpolation,
- )
- def yuv_region_2_rgb(frame, region):
- try:
- # TODO: does this copy the numpy array?
- yuv_cropped_frame = yuv_crop_and_resize(frame, region)
- return cv2.cvtColor(yuv_cropped_frame, cv2.COLOR_YUV2RGB_I420)
- except:
- print(f"frame.shape: {frame.shape}")
- print(f"region: {region}")
- raise
- def intersection(box_a, box_b):
- return (
- max(box_a[0], box_b[0]),
- max(box_a[1], box_b[1]),
- min(box_a[2], box_b[2]),
- min(box_a[3], box_b[3]),
- )
- def area(box):
- return (box[2] - box[0] + 1) * (box[3] - box[1] + 1)
- def intersection_over_union(box_a, box_b):
- # determine the (x, y)-coordinates of the intersection rectangle
- intersect = intersection(box_a, box_b)
- # compute the area of intersection rectangle
- inter_area = max(0, intersect[2] - intersect[0] + 1) * max(
- 0, intersect[3] - intersect[1] + 1
- )
- if inter_area == 0:
- return 0.0
- # compute the area of both the prediction and ground-truth
- # rectangles
- box_a_area = (box_a[2] - box_a[0] + 1) * (box_a[3] - box_a[1] + 1)
- box_b_area = (box_b[2] - box_b[0] + 1) * (box_b[3] - box_b[1] + 1)
- # compute the intersection over union by taking the intersection
- # area and dividing it by the sum of prediction + ground-truth
- # areas - the interesection area
- iou = inter_area / float(box_a_area + box_b_area - inter_area)
- # return the intersection over union value
- return iou
- def clipped(obj, frame_shape):
- # if the object is within 5 pixels of the region border, and the region is not on the edge
- # consider the object to be clipped
- box = obj[2]
- region = obj[4]
- if (
- (region[0] > 5 and box[0] - region[0] <= 5)
- or (region[1] > 5 and box[1] - region[1] <= 5)
- or (frame_shape[1] - region[2] > 5 and region[2] - box[2] <= 5)
- or (frame_shape[0] - region[3] > 5 and region[3] - box[3] <= 5)
- ):
- return True
- else:
- return False
- class EventsPerSecond:
- def __init__(self, max_events=1000):
- self._start = None
- self._max_events = max_events
- self._timestamps = []
- def start(self):
- self._start = datetime.datetime.now().timestamp()
- def update(self):
- if self._start is None:
- self.start()
- self._timestamps.append(datetime.datetime.now().timestamp())
- # truncate the list when it goes 100 over the max_size
- if len(self._timestamps) > self._max_events + 100:
- self._timestamps = self._timestamps[(1 - self._max_events) :]
- def eps(self, last_n_seconds=10):
- if self._start is None:
- self.start()
- # compute the (approximate) events in the last n seconds
- now = datetime.datetime.now().timestamp()
- seconds = min(now - self._start, last_n_seconds)
- return (
- len([t for t in self._timestamps if t > (now - last_n_seconds)]) / seconds
- )
- def print_stack(sig, frame):
- traceback.print_stack(frame)
- def listen():
- signal.signal(signal.SIGUSR1, print_stack)
- def create_mask(frame_shape, mask):
- mask_img = np.zeros(frame_shape, np.uint8)
- mask_img[:] = 255
- if isinstance(mask, list):
- for m in mask:
- add_mask(m, mask_img)
- elif isinstance(mask, str):
- add_mask(mask, mask_img)
- return mask_img
- def add_mask(mask, mask_img):
- points = mask.split(",")
- contour = np.array(
- [[int(points[i]), int(points[i + 1])] for i in range(0, len(points), 2)]
- )
- cv2.fillPoly(mask_img, pts=[contour], color=(0))
- class FrameManager(ABC):
- @abstractmethod
- def create(self, name, size) -> AnyStr:
- pass
- @abstractmethod
- def get(self, name, timeout_ms=0):
- pass
- @abstractmethod
- def close(self, name):
- pass
- @abstractmethod
- def delete(self, name):
- pass
- class DictFrameManager(FrameManager):
- def __init__(self):
- self.frames = {}
- def create(self, name, size) -> AnyStr:
- mem = bytearray(size)
- self.frames[name] = mem
- return mem
- def get(self, name, shape):
- mem = self.frames[name]
- return np.ndarray(shape, dtype=np.uint8, buffer=mem)
- def close(self, name):
- pass
- def delete(self, name):
- del self.frames[name]
- class SharedMemoryFrameManager(FrameManager):
- def __init__(self):
- self.shm_store = {}
- def create(self, name, size) -> AnyStr:
- shm = shared_memory.SharedMemory(name=name, create=True, size=size)
- self.shm_store[name] = shm
- return shm.buf
- def get(self, name, shape):
- if name in self.shm_store:
- shm = self.shm_store[name]
- else:
- shm = shared_memory.SharedMemory(name=name)
- self.shm_store[name] = shm
- return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
- def close(self, name):
- if name in self.shm_store:
- self.shm_store[name].close()
- del self.shm_store[name]
- def delete(self, name):
- if name in self.shm_store:
- self.shm_store[name].close()
- self.shm_store[name].unlink()
- del self.shm_store[name]
|