video.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. import os
  2. import time
  3. import datetime
  4. import cv2
  5. import threading
  6. import ctypes
  7. import multiprocessing as mp
  8. import numpy as np
  9. from object_detection.utils import visualization_utils as vis_util
  10. from . util import tonumpyarray
  11. from . object_detection import FramePrepper
  12. from . objects import ObjectCleaner, BestPersonFrame
  13. from . mqtt import MqttObjectPublisher
  14. # fetch the frames as fast a possible and store current frame in a shared memory array
  15. def fetch_frames(shared_arr, shared_frame_time, frame_lock, frame_ready, frame_shape, rtsp_url):
  16. # convert shared memory array into numpy and shape into image array
  17. arr = tonumpyarray(shared_arr).reshape(frame_shape)
  18. # start the video capture
  19. video = cv2.VideoCapture()
  20. video.open(rtsp_url)
  21. print("Opening the RTSP Url...")
  22. # keep the buffer small so we minimize old data
  23. video.set(cv2.CAP_PROP_BUFFERSIZE,1)
  24. bad_frame_counter = 0
  25. while True:
  26. # check if the video stream is still open, and reopen if needed
  27. if not video.isOpened():
  28. success = video.open(rtsp_url)
  29. if not success:
  30. time.sleep(1)
  31. continue
  32. # grab the frame, but dont decode it yet
  33. ret = video.grab()
  34. # snapshot the time the frame was grabbed
  35. frame_time = datetime.datetime.now()
  36. if ret:
  37. # go ahead and decode the current frame
  38. ret, frame = video.retrieve()
  39. if ret:
  40. # Lock access and update frame
  41. with frame_lock:
  42. arr[:] = frame
  43. shared_frame_time.value = frame_time.timestamp()
  44. # Notify with the condition that a new frame is ready
  45. with frame_ready:
  46. frame_ready.notify_all()
  47. bad_frame_counter = 0
  48. else:
  49. print("Unable to decode frame")
  50. bad_frame_counter += 1
  51. else:
  52. print("Unable to grab a frame")
  53. bad_frame_counter += 1
  54. if bad_frame_counter > 100:
  55. video.release()
  56. video.release()
  57. # Stores 2 seconds worth of frames when motion is detected so they can be used for other threads
  58. class FrameTracker(threading.Thread):
  59. def __init__(self, shared_frame, frame_time, frame_ready, frame_lock, recent_frames):
  60. threading.Thread.__init__(self)
  61. self.shared_frame = shared_frame
  62. self.frame_time = frame_time
  63. self.frame_ready = frame_ready
  64. self.frame_lock = frame_lock
  65. self.recent_frames = recent_frames
  66. def run(self):
  67. frame_time = 0.0
  68. while True:
  69. now = datetime.datetime.now().timestamp()
  70. # wait for a frame
  71. with self.frame_ready:
  72. # if there isnt a frame ready for processing or it is old, wait for a signal
  73. if self.frame_time.value == frame_time or (now - self.frame_time.value) > 0.5:
  74. self.frame_ready.wait()
  75. # lock and make a copy of the frame
  76. with self.frame_lock:
  77. frame = self.shared_frame.copy()
  78. frame_time = self.frame_time.value
  79. # add the frame to recent frames
  80. self.recent_frames[frame_time] = frame
  81. # delete any old frames
  82. stored_frame_times = list(self.recent_frames.keys())
  83. for k in stored_frame_times:
  84. if (now - k) > 2:
  85. del self.recent_frames[k]
  86. def get_frame_shape(rtsp_url):
  87. # capture a single frame and check the frame shape so the correct array
  88. # size can be allocated in memory
  89. video = cv2.VideoCapture(rtsp_url)
  90. ret, frame = video.read()
  91. frame_shape = frame.shape
  92. video.release()
  93. return frame_shape
  94. def get_rtsp_url(rtsp_config):
  95. if (rtsp_config['password'].startswith('$')):
  96. rtsp_config['password'] = os.getenv(rtsp_config['password'][1:])
  97. return 'rtsp://{}:{}@{}:{}{}'.format(rtsp_config['user'],
  98. rtsp_config['password'], rtsp_config['host'], rtsp_config['port'],
  99. rtsp_config['path'])
  100. class CameraWatchdog(threading.Thread):
  101. def __init__(self, camera):
  102. threading.Thread.__init__(self)
  103. self.camera = camera
  104. def run(self):
  105. while True:
  106. # wait a bit before checking
  107. time.sleep(60)
  108. if (datetime.datetime.now().timestamp() - self.camera.shared_frame_time.value) > 2:
  109. print("last frame is more than 2 seconds old, restarting camera capture...")
  110. self.camera.start_or_restart_capture()
  111. time.sleep(5)
  112. class Camera:
  113. def __init__(self, name, config, prepped_frame_queue, mqtt_client, mqtt_prefix):
  114. self.name = name
  115. self.config = config
  116. self.detected_objects = []
  117. self.recent_frames = {}
  118. self.rtsp_url = get_rtsp_url(self.config['rtsp'])
  119. self.regions = self.config['regions']
  120. self.frame_shape = get_frame_shape(self.rtsp_url)
  121. self.mqtt_client = mqtt_client
  122. self.mqtt_topic_prefix = '{}/{}'.format(mqtt_prefix, self.name)
  123. # compute the flattened array length from the shape of the frame
  124. flat_array_length = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2]
  125. # create shared array for storing the full frame image data
  126. self.shared_frame_array = mp.Array(ctypes.c_uint8, flat_array_length)
  127. # create shared value for storing the frame_time
  128. self.shared_frame_time = mp.Value('d', 0.0)
  129. # Lock to control access to the frame
  130. self.frame_lock = mp.Lock()
  131. # Condition for notifying that a new frame is ready
  132. self.frame_ready = mp.Condition()
  133. # Condition for notifying that objects were parsed
  134. self.objects_parsed = mp.Condition()
  135. # shape current frame so it can be treated as a numpy image
  136. self.shared_frame_np = tonumpyarray(self.shared_frame_array).reshape(self.frame_shape)
  137. self.capture_process = None
  138. # for each region, create a separate thread to resize the region and prep for detection
  139. self.detection_prep_threads = []
  140. for region in self.config['regions']:
  141. # set a default threshold of 0.5 if not defined
  142. if not 'threshold' in region:
  143. region['threshold'] = 0.5
  144. if not isinstance(region['threshold'], float):
  145. print('Threshold is not a float. Setting to 0.5 default.')
  146. region['threshold'] = 0.5
  147. self.detection_prep_threads.append(FramePrepper(
  148. self.name,
  149. self.shared_frame_np,
  150. self.shared_frame_time,
  151. self.frame_ready,
  152. self.frame_lock,
  153. region['size'], region['x_offset'], region['y_offset'], region['threshold'],
  154. prepped_frame_queue
  155. ))
  156. # start a thread to store recent motion frames for processing
  157. self.frame_tracker = FrameTracker(self.shared_frame_np, self.shared_frame_time,
  158. self.frame_ready, self.frame_lock, self.recent_frames)
  159. self.frame_tracker.start()
  160. # start a thread to store the highest scoring recent person frame
  161. self.best_person_frame = BestPersonFrame(self.objects_parsed, self.recent_frames, self.detected_objects)
  162. self.best_person_frame.start()
  163. # start a thread to expire objects from the detected objects list
  164. self.object_cleaner = ObjectCleaner(self.objects_parsed, self.detected_objects)
  165. self.object_cleaner.start()
  166. # start a thread to publish object scores (currently only person)
  167. mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self.objects_parsed, self.detected_objects)
  168. mqtt_publisher.start()
  169. # create a watchdog thread for capture process
  170. self.watchdog = CameraWatchdog(self)
  171. # load in the mask for person detection
  172. if 'mask' in self.config:
  173. self.mask = cv2.imread("/config/{}".format(self.config['mask']), cv2.IMREAD_GRAYSCALE)
  174. else:
  175. self.mask = np.zeros((self.frame_shape[0], self.frame_shape[1], 1), np.uint8)
  176. self.mask[:] = 255
  177. def start_or_restart_capture(self):
  178. if not self.capture_process is None:
  179. print("Terminating the existing capture process...")
  180. self.capture_process.terminate()
  181. del self.capture_process
  182. self.capture_process = None
  183. # create the process to capture frames from the RTSP stream and store in a shared array
  184. print("Creating a new capture process...")
  185. self.capture_process = mp.Process(target=fetch_frames, args=(self.shared_frame_array,
  186. self.shared_frame_time, self.frame_lock, self.frame_ready, self.frame_shape, self.rtsp_url))
  187. self.capture_process.daemon = True
  188. print("Starting a new capture process...")
  189. self.capture_process.start()
  190. def start(self):
  191. self.start_or_restart_capture()
  192. # start the object detection prep threads
  193. for detection_prep_thread in self.detection_prep_threads:
  194. detection_prep_thread.start()
  195. self.watchdog.start()
  196. def join(self):
  197. self.capture_process.join()
  198. def get_capture_pid(self):
  199. return self.capture_process.pid
  200. def add_objects(self, objects):
  201. if len(objects) == 0:
  202. return
  203. for obj in objects:
  204. if obj['name'] == 'person':
  205. person_area = (obj['xmax']-obj['xmin'])*(obj['ymax']-obj['ymin'])
  206. # find the matching region
  207. region = None
  208. for r in self.regions:
  209. if (
  210. obj['xmin'] >= r['x_offset'] and
  211. obj['ymin'] >= r['y_offset'] and
  212. obj['xmax'] <= r['x_offset']+r['size'] and
  213. obj['ymax'] <= r['y_offset']+r['size']
  214. ):
  215. region = r
  216. break
  217. # if the min person area is larger than the
  218. # detected person, don't add it to detected objects
  219. if region and region['min_person_area'] > person_area:
  220. continue
  221. # compute the coordinates of the person and make sure
  222. # the location isnt outide the bounds of the image (can happen from rounding)
  223. y_location = min(int(obj['ymax']), len(self.mask)-1)
  224. x_location = min(int((obj['xmax']-obj['xmin'])/2.0), len(self.mask[0])-1)
  225. # if the person is in a masked location, continue
  226. if self.mask[y_location][x_location] == [0]:
  227. continue
  228. self.detected_objects.append(obj)
  229. with self.objects_parsed:
  230. self.objects_parsed.notify_all()
  231. def get_best_person(self):
  232. return self.best_person_frame.best_frame
  233. def get_current_frame_with_objects(self):
  234. # make a copy of the current detected objects
  235. detected_objects = self.detected_objects.copy()
  236. # lock and make a copy of the current frame
  237. with self.frame_lock:
  238. frame = self.shared_frame_np.copy()
  239. # convert to RGB for drawing
  240. frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
  241. # draw the bounding boxes on the screen
  242. for obj in detected_objects:
  243. vis_util.draw_bounding_box_on_image_array(frame,
  244. obj['ymin'],
  245. obj['xmin'],
  246. obj['ymax'],
  247. obj['xmax'],
  248. color='red',
  249. thickness=2,
  250. display_str_list=["{}: {}%".format(obj['name'],int(obj['score']*100))],
  251. use_normalized_coordinates=False)
  252. for region in self.regions:
  253. color = (255,255,255)
  254. cv2.rectangle(frame, (region['x_offset'], region['y_offset']),
  255. (region['x_offset']+region['size'], region['y_offset']+region['size']),
  256. color, 2)
  257. # convert back to BGR
  258. frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
  259. return frame