output.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. import datetime
  2. import glob
  3. import logging
  4. import math
  5. import multiprocessing as mp
  6. import queue
  7. import signal
  8. import subprocess as sp
  9. import threading
  10. from multiprocessing import shared_memory
  11. from wsgiref.simple_server import make_server
  12. import cv2
  13. import numpy as np
  14. from setproctitle import setproctitle
  15. from ws4py.server.wsgirefserver import (
  16. WebSocketWSGIHandler,
  17. WebSocketWSGIRequestHandler,
  18. WSGIServer,
  19. )
  20. from ws4py.server.wsgiutils import WebSocketWSGIApplication
  21. from ws4py.websocket import WebSocket
  22. from frigate.config import BirdseyeModeEnum, FrigateConfig
  23. from frigate.util import SharedMemoryFrameManager, copy_yuv_to_position, get_yuv_crop
  24. logger = logging.getLogger(__name__)
  25. class FFMpegConverter:
  26. def __init__(self, in_width, in_height, out_width, out_height, quality):
  27. ffmpeg_cmd = f"ffmpeg -f rawvideo -pix_fmt yuv420p -video_size {in_width}x{in_height} -i pipe: -f mpegts -s {out_width}x{out_height} -codec:v mpeg1video -q {quality} -bf 0 pipe:".split(
  28. " "
  29. )
  30. self.process = sp.Popen(
  31. ffmpeg_cmd,
  32. stdout=sp.PIPE,
  33. stderr=sp.DEVNULL,
  34. stdin=sp.PIPE,
  35. start_new_session=True,
  36. )
  37. def write(self, b):
  38. self.process.stdin.write(b)
  39. def read(self, length):
  40. try:
  41. return self.process.stdout.read1(length)
  42. except ValueError:
  43. return False
  44. def exit(self):
  45. self.process.terminate()
  46. try:
  47. self.process.communicate(timeout=30)
  48. except sp.TimeoutExpired:
  49. self.process.kill()
  50. self.process.communicate()
  51. class BroadcastThread(threading.Thread):
  52. def __init__(self, camera, converter, websocket_server):
  53. super(BroadcastThread, self).__init__()
  54. self.camera = camera
  55. self.converter = converter
  56. self.websocket_server = websocket_server
  57. def run(self):
  58. while True:
  59. buf = self.converter.read(65536)
  60. if buf:
  61. manager = self.websocket_server.manager
  62. with manager.lock:
  63. websockets = manager.websockets.copy()
  64. ws_iter = iter(websockets.values())
  65. for ws in ws_iter:
  66. if (
  67. not ws.terminated
  68. and ws.environ["PATH_INFO"] == f"/{self.camera}"
  69. ):
  70. try:
  71. ws.send(buf, binary=True)
  72. except:
  73. pass
  74. elif self.converter.process.poll() is not None:
  75. break
  76. class BirdsEyeFrameManager:
  77. def __init__(self, config, frame_manager: SharedMemoryFrameManager):
  78. self.config = config
  79. self.mode = config.birdseye.mode
  80. self.frame_manager = frame_manager
  81. width = config.birdseye.width
  82. height = config.birdseye.height
  83. self.frame_shape = (height, width)
  84. self.yuv_shape = (height * 3 // 2, width)
  85. self.frame = np.ndarray(self.yuv_shape, dtype=np.uint8)
  86. # initialize the frame as black and with the frigate logo
  87. self.blank_frame = np.zeros(self.yuv_shape, np.uint8)
  88. self.blank_frame[:] = 128
  89. self.blank_frame[0 : self.frame_shape[0], 0 : self.frame_shape[1]] = 16
  90. # find and copy the logo on the blank frame
  91. logo_files = glob.glob("/opt/frigate/frigate/birdseye.png")
  92. frigate_logo = None
  93. if len(logo_files) > 0:
  94. frigate_logo = cv2.imread(logo_files[0], cv2.IMREAD_UNCHANGED)
  95. if not frigate_logo is None:
  96. transparent_layer = frigate_logo[:, :, 3]
  97. y_offset = height // 2 - transparent_layer.shape[0] // 2
  98. x_offset = width // 2 - transparent_layer.shape[1] // 2
  99. self.blank_frame[
  100. y_offset : y_offset + transparent_layer.shape[1],
  101. x_offset : x_offset + transparent_layer.shape[0],
  102. ] = transparent_layer
  103. else:
  104. logger.warning("Unable to read frigate logo")
  105. self.frame[:] = self.blank_frame
  106. self.cameras = {}
  107. for camera, settings in self.config.cameras.items():
  108. # precalculate the coordinates for all the channels
  109. y, u1, u2, v1, v2 = get_yuv_crop(
  110. settings.frame_shape_yuv,
  111. (
  112. 0,
  113. 0,
  114. settings.frame_shape[1],
  115. settings.frame_shape[0],
  116. ),
  117. )
  118. self.cameras[camera] = {
  119. "last_active_frame": 0.0,
  120. "current_frame": 0.0,
  121. "layout_frame": 0.0,
  122. "channel_dims": {
  123. "y": y,
  124. "u1": u1,
  125. "u2": u2,
  126. "v1": v1,
  127. "v2": v2,
  128. },
  129. }
  130. self.camera_layout = []
  131. self.active_cameras = set()
  132. self.layout_dim = 0
  133. self.last_output_time = 0.0
  134. def clear_frame(self):
  135. logger.debug(f"Clearing the birdseye frame")
  136. self.frame[:] = self.blank_frame
  137. def copy_to_position(self, position, camera=None, frame_time=None):
  138. if camera is None:
  139. frame = None
  140. channel_dims = None
  141. else:
  142. try:
  143. frame = self.frame_manager.get(
  144. f"{camera}{frame_time}", self.config.cameras[camera].frame_shape_yuv
  145. )
  146. except FileNotFoundError:
  147. # TODO: better frame management would prevent this edge case
  148. logger.warning(
  149. f"Unable to copy frame {camera}{frame_time} to birdseye."
  150. )
  151. return
  152. channel_dims = self.cameras[camera]["channel_dims"]
  153. copy_yuv_to_position(
  154. self.frame,
  155. self.layout_offsets[position],
  156. self.layout_frame_shape,
  157. frame,
  158. channel_dims,
  159. )
  160. def camera_active(self, object_box_count, motion_box_count):
  161. if self.mode == BirdseyeModeEnum.continuous:
  162. return True
  163. if (
  164. self.mode == BirdseyeModeEnum.motion
  165. and object_box_count + motion_box_count > 0
  166. ):
  167. return True
  168. if self.mode == BirdseyeModeEnum.objects and object_box_count > 0:
  169. return True
  170. def update_frame(self):
  171. # determine how many cameras are tracking objects within the last 30 seconds
  172. active_cameras = set(
  173. [
  174. cam
  175. for cam, cam_data in self.cameras.items()
  176. if cam_data["last_active_frame"] > 0
  177. and cam_data["current_frame"] - cam_data["last_active_frame"] < 30
  178. ]
  179. )
  180. # if there are no active cameras
  181. if len(active_cameras) == 0:
  182. # if the layout is already cleared
  183. if len(self.camera_layout) == 0:
  184. return False
  185. # if the layout needs to be cleared
  186. else:
  187. self.camera_layout = []
  188. self.layout_dim = 0
  189. self.clear_frame()
  190. return True
  191. # calculate layout dimensions
  192. layout_dim = math.ceil(math.sqrt(len(active_cameras)))
  193. # reset the layout if it needs to be different
  194. if layout_dim != self.layout_dim:
  195. logger.debug(f"Changing layout size from {self.layout_dim} to {layout_dim}")
  196. self.layout_dim = layout_dim
  197. self.camera_layout = [None] * layout_dim * layout_dim
  198. # calculate resolution of each position in the layout
  199. self.layout_frame_shape = (
  200. self.frame_shape[0] // layout_dim, # height
  201. self.frame_shape[1] // layout_dim, # width
  202. )
  203. self.clear_frame()
  204. for cam_data in self.cameras.values():
  205. cam_data["layout_frame"] = 0.0
  206. self.active_cameras = set()
  207. self.layout_offsets = []
  208. # calculate the x and y offset for each position in the layout
  209. for position in range(0, len(self.camera_layout)):
  210. y_offset = self.layout_frame_shape[0] * math.floor(
  211. position / self.layout_dim
  212. )
  213. x_offset = self.layout_frame_shape[1] * (position % self.layout_dim)
  214. self.layout_offsets.append((y_offset, x_offset))
  215. removed_cameras = self.active_cameras.difference(active_cameras)
  216. added_cameras = active_cameras.difference(self.active_cameras)
  217. self.active_cameras = active_cameras
  218. # update each position in the layout
  219. for position, camera in enumerate(self.camera_layout, start=0):
  220. # if this camera was removed, replace it or clear it
  221. if camera in removed_cameras:
  222. # if replacing this camera with a newly added one
  223. if len(added_cameras) > 0:
  224. added_camera = added_cameras.pop()
  225. self.camera_layout[position] = added_camera
  226. self.copy_to_position(
  227. position,
  228. added_camera,
  229. self.cameras[added_camera]["current_frame"],
  230. )
  231. self.cameras[added_camera]["layout_frame"] = self.cameras[
  232. added_camera
  233. ]["current_frame"]
  234. # if removing this camera with no replacement
  235. else:
  236. self.camera_layout[position] = None
  237. self.copy_to_position(position)
  238. removed_cameras.remove(camera)
  239. # if an empty spot and there are cameras to add
  240. elif camera is None and len(added_cameras) > 0:
  241. added_camera = added_cameras.pop()
  242. self.camera_layout[position] = added_camera
  243. self.copy_to_position(
  244. position,
  245. added_camera,
  246. self.cameras[added_camera]["current_frame"],
  247. )
  248. self.cameras[added_camera]["layout_frame"] = self.cameras[added_camera][
  249. "current_frame"
  250. ]
  251. # if not an empty spot and the camera has a newer frame, copy it
  252. elif (
  253. not camera is None
  254. and self.cameras[camera]["current_frame"]
  255. != self.cameras[camera]["layout_frame"]
  256. ):
  257. self.copy_to_position(
  258. position, camera, self.cameras[camera]["current_frame"]
  259. )
  260. self.cameras[camera]["layout_frame"] = self.cameras[camera][
  261. "current_frame"
  262. ]
  263. return True
  264. def update(self, camera, object_count, motion_count, frame_time, frame) -> bool:
  265. # update the last active frame for the camera
  266. self.cameras[camera]["current_frame"] = frame_time
  267. if self.camera_active(object_count, motion_count):
  268. self.cameras[camera]["last_active_frame"] = frame_time
  269. now = datetime.datetime.now().timestamp()
  270. # limit output to 10 fps
  271. if (now - self.last_output_time) < 1 / 10:
  272. return False
  273. # if the frame was updated or the fps is too low, send frame
  274. if self.update_frame() or (now - self.last_output_time) > 1:
  275. self.last_output_time = now
  276. return True
  277. return False
  278. def output_frames(config: FrigateConfig, video_output_queue):
  279. threading.current_thread().name = f"output"
  280. setproctitle(f"frigate.output")
  281. stop_event = mp.Event()
  282. def receiveSignal(signalNumber, frame):
  283. stop_event.set()
  284. signal.signal(signal.SIGTERM, receiveSignal)
  285. signal.signal(signal.SIGINT, receiveSignal)
  286. frame_manager = SharedMemoryFrameManager()
  287. previous_frames = {}
  288. # start a websocket server on 8082
  289. WebSocketWSGIHandler.http_version = "1.1"
  290. websocket_server = make_server(
  291. "127.0.0.1",
  292. 8082,
  293. server_class=WSGIServer,
  294. handler_class=WebSocketWSGIRequestHandler,
  295. app=WebSocketWSGIApplication(handler_cls=WebSocket),
  296. )
  297. websocket_server.initialize_websockets_manager()
  298. websocket_thread = threading.Thread(target=websocket_server.serve_forever)
  299. converters = {}
  300. broadcasters = {}
  301. for camera, cam_config in config.cameras.items():
  302. width = int(
  303. cam_config.live.height
  304. * (cam_config.frame_shape[1] / cam_config.frame_shape[0])
  305. )
  306. converters[camera] = FFMpegConverter(
  307. cam_config.frame_shape[1],
  308. cam_config.frame_shape[0],
  309. width,
  310. cam_config.live.height,
  311. cam_config.live.quality,
  312. )
  313. broadcasters[camera] = BroadcastThread(
  314. camera, converters[camera], websocket_server
  315. )
  316. if config.birdseye.enabled:
  317. converters["birdseye"] = FFMpegConverter(
  318. config.birdseye.width,
  319. config.birdseye.height,
  320. config.birdseye.width,
  321. config.birdseye.height,
  322. config.birdseye.quality,
  323. )
  324. broadcasters["birdseye"] = BroadcastThread(
  325. "birdseye", converters["birdseye"], websocket_server
  326. )
  327. websocket_thread.start()
  328. for t in broadcasters.values():
  329. t.start()
  330. birdseye_manager = BirdsEyeFrameManager(config, frame_manager)
  331. while not stop_event.is_set():
  332. try:
  333. (
  334. camera,
  335. frame_time,
  336. current_tracked_objects,
  337. motion_boxes,
  338. regions,
  339. ) = video_output_queue.get(True, 10)
  340. except queue.Empty:
  341. continue
  342. frame_id = f"{camera}{frame_time}"
  343. frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
  344. # send camera frame to ffmpeg process if websockets are connected
  345. if any(
  346. ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
  347. ):
  348. # write to the converter for the camera if clients are listening to the specific camera
  349. converters[camera].write(frame.tobytes())
  350. # update birdseye if websockets are connected
  351. if config.birdseye.enabled and any(
  352. ws.environ["PATH_INFO"].endswith("birdseye")
  353. for ws in websocket_server.manager
  354. ):
  355. if birdseye_manager.update(
  356. camera,
  357. len(current_tracked_objects),
  358. len(motion_boxes),
  359. frame_time,
  360. frame,
  361. ):
  362. converters["birdseye"].write(birdseye_manager.frame.tobytes())
  363. if camera in previous_frames:
  364. frame_manager.delete(f"{camera}{previous_frames[camera]}")
  365. previous_frames[camera] = frame_time
  366. while not video_output_queue.empty():
  367. (
  368. camera,
  369. frame_time,
  370. current_tracked_objects,
  371. motion_boxes,
  372. regions,
  373. ) = video_output_queue.get(True, 10)
  374. frame_id = f"{camera}{frame_time}"
  375. frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
  376. frame_manager.delete(frame_id)
  377. for c in converters.values():
  378. c.exit()
  379. for b in broadcasters.values():
  380. b.join()
  381. websocket_server.manager.close_all()
  382. websocket_server.manager.stop()
  383. websocket_server.manager.join()
  384. websocket_server.shutdown()
  385. websocket_thread.join()
  386. logger.info("exiting output process...")