output.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. import datetime
  2. import glob
  3. import logging
  4. import math
  5. import multiprocessing as mp
  6. import queue
  7. import signal
  8. import subprocess as sp
  9. import threading
  10. from multiprocessing import shared_memory
  11. from wsgiref.simple_server import make_server
  12. import cv2
  13. import numpy as np
  14. from setproctitle import setproctitle
  15. from ws4py.server.wsgirefserver import (
  16. WebSocketWSGIHandler,
  17. WebSocketWSGIRequestHandler,
  18. WSGIServer,
  19. )
  20. from ws4py.server.wsgiutils import WebSocketWSGIApplication
  21. from ws4py.websocket import WebSocket
  22. from frigate.config import BirdseyeModeEnum, FrigateConfig
  23. from frigate.util import SharedMemoryFrameManager, copy_yuv_to_position, get_yuv_crop
  24. logger = logging.getLogger(__name__)
  25. class FFMpegConverter:
  26. def __init__(self, in_width, in_height, out_width, out_height, quality):
  27. ffmpeg_cmd = f"ffmpeg -f rawvideo -pix_fmt yuv420p -video_size {in_width}x{in_height} -i pipe: -f mpegts -s {out_width}x{out_height} -codec:v mpeg1video -q {quality} -bf 0 pipe:".split(
  28. " "
  29. )
  30. self.process = sp.Popen(
  31. ffmpeg_cmd,
  32. stdout=sp.PIPE,
  33. stderr=sp.DEVNULL,
  34. stdin=sp.PIPE,
  35. start_new_session=True,
  36. )
  37. def write(self, b):
  38. self.process.stdin.write(b)
  39. def read(self, length):
  40. try:
  41. return self.process.stdout.read1(length)
  42. except ValueError:
  43. return False
  44. def exit(self):
  45. self.process.terminate()
  46. try:
  47. self.process.communicate(timeout=30)
  48. except sp.TimeoutExpired:
  49. self.process.kill()
  50. self.process.communicate()
  51. class BroadcastThread(threading.Thread):
  52. def __init__(self, camera, converter, websocket_server):
  53. super(BroadcastThread, self).__init__()
  54. self.camera = camera
  55. self.converter = converter
  56. self.websocket_server = websocket_server
  57. def run(self):
  58. while True:
  59. buf = self.converter.read(65536)
  60. if buf:
  61. manager = self.websocket_server.manager
  62. with manager.lock:
  63. websockets = manager.websockets.copy()
  64. ws_iter = iter(websockets.values())
  65. for ws in ws_iter:
  66. if not ws.terminated and ws.environ["PATH_INFO"].endswith(
  67. self.camera
  68. ):
  69. try:
  70. ws.send(buf, binary=True)
  71. except:
  72. pass
  73. elif self.converter.process.poll() is not None:
  74. break
  75. class BirdsEyeFrameManager:
  76. def __init__(self, config, frame_manager: SharedMemoryFrameManager):
  77. self.config = config
  78. self.mode = config.birdseye.mode
  79. self.frame_manager = frame_manager
  80. width = config.birdseye.width
  81. height = config.birdseye.height
  82. self.frame_shape = (height, width)
  83. self.yuv_shape = (height * 3 // 2, width)
  84. self.frame = np.ndarray(self.yuv_shape, dtype=np.uint8)
  85. # initialize the frame as black and with the frigate logo
  86. self.blank_frame = np.zeros(self.yuv_shape, np.uint8)
  87. self.blank_frame[:] = 128
  88. self.blank_frame[0 : self.frame_shape[0], 0 : self.frame_shape[1]] = 16
  89. # find and copy the logo on the blank frame
  90. logo_files = glob.glob("/opt/frigate/web/apple-touch-icon.*.png")
  91. frigate_logo = None
  92. if len(logo_files) > 0:
  93. frigate_logo = cv2.imread(logo_files[0], cv2.IMREAD_UNCHANGED)
  94. if not frigate_logo is None:
  95. transparent_layer = frigate_logo[:, :, 3]
  96. y_offset = height // 2 - transparent_layer.shape[0] // 2
  97. x_offset = width // 2 - transparent_layer.shape[1] // 2
  98. self.blank_frame[
  99. y_offset : y_offset + transparent_layer.shape[1],
  100. x_offset : x_offset + transparent_layer.shape[0],
  101. ] = transparent_layer
  102. else:
  103. logger.warning("Unable to read frigate logo")
  104. self.frame[:] = self.blank_frame
  105. self.cameras = {}
  106. for camera, settings in self.config.cameras.items():
  107. # precalculate the coordinates for all the channels
  108. y, u1, u2, v1, v2 = get_yuv_crop(
  109. settings.frame_shape_yuv,
  110. (
  111. 0,
  112. 0,
  113. settings.frame_shape[1],
  114. settings.frame_shape[0],
  115. ),
  116. )
  117. self.cameras[camera] = {
  118. "last_active_frame": 0.0,
  119. "current_frame": 0.0,
  120. "layout_frame": 0.0,
  121. "channel_dims": {
  122. "y": y,
  123. "u1": u1,
  124. "u2": u2,
  125. "v1": v1,
  126. "v2": v2,
  127. },
  128. }
  129. self.camera_layout = []
  130. self.active_cameras = set()
  131. self.layout_dim = 0
  132. self.last_output_time = 0.0
  133. def clear_frame(self):
  134. logger.debug(f"Clearing the birdseye frame")
  135. self.frame[:] = self.blank_frame
  136. def copy_to_position(self, position, camera=None, frame_time=None):
  137. if camera is None:
  138. frame = None
  139. channel_dims = None
  140. else:
  141. try:
  142. frame = self.frame_manager.get(
  143. f"{camera}{frame_time}", self.config.cameras[camera].frame_shape_yuv
  144. )
  145. except FileNotFoundError:
  146. # TODO: better frame management would prevent this edge case
  147. logger.warning(
  148. f"Unable to copy frame {camera}{frame_time} to birdseye."
  149. )
  150. return
  151. channel_dims = self.cameras[camera]["channel_dims"]
  152. copy_yuv_to_position(
  153. self.frame,
  154. self.layout_offsets[position],
  155. self.layout_frame_shape,
  156. frame,
  157. channel_dims,
  158. )
  159. def camera_active(self, object_box_count, motion_box_count):
  160. if self.mode == BirdseyeModeEnum.continuous:
  161. return True
  162. if (
  163. self.mode == BirdseyeModeEnum.motion
  164. and object_box_count + motion_box_count > 0
  165. ):
  166. return True
  167. if self.mode == BirdseyeModeEnum.objects and object_box_count > 0:
  168. return True
  169. def update_frame(self):
  170. # determine how many cameras are tracking objects within the last 30 seconds
  171. active_cameras = set(
  172. [
  173. cam
  174. for cam, cam_data in self.cameras.items()
  175. if cam_data["last_active_frame"] > 0
  176. and cam_data["current_frame"] - cam_data["last_active_frame"] < 30
  177. ]
  178. )
  179. # if there are no active cameras
  180. if len(active_cameras) == 0:
  181. # if the layout is already cleared
  182. if len(self.camera_layout) == 0:
  183. return False
  184. # if the layout needs to be cleared
  185. else:
  186. self.camera_layout = []
  187. self.layout_dim = 0
  188. self.clear_frame()
  189. return True
  190. # calculate layout dimensions
  191. layout_dim = math.ceil(math.sqrt(len(active_cameras)))
  192. # reset the layout if it needs to be different
  193. if layout_dim != self.layout_dim:
  194. logger.debug(f"Changing layout size from {self.layout_dim} to {layout_dim}")
  195. self.layout_dim = layout_dim
  196. self.camera_layout = [None] * layout_dim * layout_dim
  197. # calculate resolution of each position in the layout
  198. self.layout_frame_shape = (
  199. self.frame_shape[0] // layout_dim, # height
  200. self.frame_shape[1] // layout_dim, # width
  201. )
  202. self.clear_frame()
  203. for cam_data in self.cameras.values():
  204. cam_data["layout_frame"] = 0.0
  205. self.active_cameras = set()
  206. self.layout_offsets = []
  207. # calculate the x and y offset for each position in the layout
  208. for position in range(0, len(self.camera_layout)):
  209. y_offset = self.layout_frame_shape[0] * math.floor(
  210. position / self.layout_dim
  211. )
  212. x_offset = self.layout_frame_shape[1] * (position % self.layout_dim)
  213. self.layout_offsets.append((y_offset, x_offset))
  214. removed_cameras = self.active_cameras.difference(active_cameras)
  215. added_cameras = active_cameras.difference(self.active_cameras)
  216. self.active_cameras = active_cameras
  217. # update each position in the layout
  218. for position, camera in enumerate(self.camera_layout, start=0):
  219. # if this camera was removed, replace it or clear it
  220. if camera in removed_cameras:
  221. # if replacing this camera with a newly added one
  222. if len(added_cameras) > 0:
  223. added_camera = added_cameras.pop()
  224. self.camera_layout[position] = added_camera
  225. self.copy_to_position(
  226. position,
  227. added_camera,
  228. self.cameras[added_camera]["current_frame"],
  229. )
  230. self.cameras[added_camera]["layout_frame"] = self.cameras[
  231. added_camera
  232. ]["current_frame"]
  233. # if removing this camera with no replacement
  234. else:
  235. self.camera_layout[position] = None
  236. self.copy_to_position(position)
  237. removed_cameras.remove(camera)
  238. # if an empty spot and there are cameras to add
  239. elif camera is None and len(added_cameras) > 0:
  240. added_camera = added_cameras.pop()
  241. self.camera_layout[position] = added_camera
  242. self.copy_to_position(
  243. position,
  244. added_camera,
  245. self.cameras[added_camera]["current_frame"],
  246. )
  247. self.cameras[added_camera]["layout_frame"] = self.cameras[added_camera][
  248. "current_frame"
  249. ]
  250. # if not an empty spot and the camera has a newer frame, copy it
  251. elif (
  252. not camera is None
  253. and self.cameras[camera]["current_frame"]
  254. != self.cameras[camera]["layout_frame"]
  255. ):
  256. self.copy_to_position(
  257. position, camera, self.cameras[camera]["current_frame"]
  258. )
  259. self.cameras[camera]["layout_frame"] = self.cameras[camera][
  260. "current_frame"
  261. ]
  262. return True
  263. def update(self, camera, object_count, motion_count, frame_time, frame) -> bool:
  264. # update the last active frame for the camera
  265. self.cameras[camera]["current_frame"] = frame_time
  266. if self.camera_active(object_count, motion_count):
  267. self.cameras[camera]["last_active_frame"] = frame_time
  268. now = datetime.datetime.now().timestamp()
  269. # limit output to 10 fps
  270. if (now - self.last_output_time) < 1 / 10:
  271. return False
  272. # if the frame was updated or the fps is too low, send frame
  273. if self.update_frame() or (now - self.last_output_time) > 1:
  274. self.last_output_time = now
  275. return True
  276. return False
  277. def output_frames(config: FrigateConfig, video_output_queue):
  278. threading.current_thread().name = f"output"
  279. setproctitle(f"frigate.output")
  280. stop_event = mp.Event()
  281. def receiveSignal(signalNumber, frame):
  282. stop_event.set()
  283. signal.signal(signal.SIGTERM, receiveSignal)
  284. signal.signal(signal.SIGINT, receiveSignal)
  285. frame_manager = SharedMemoryFrameManager()
  286. previous_frames = {}
  287. # start a websocket server on 8082
  288. WebSocketWSGIHandler.http_version = "1.1"
  289. websocket_server = make_server(
  290. "127.0.0.1",
  291. 8082,
  292. server_class=WSGIServer,
  293. handler_class=WebSocketWSGIRequestHandler,
  294. app=WebSocketWSGIApplication(handler_cls=WebSocket),
  295. )
  296. websocket_server.initialize_websockets_manager()
  297. websocket_thread = threading.Thread(target=websocket_server.serve_forever)
  298. converters = {}
  299. broadcasters = {}
  300. for camera, cam_config in config.cameras.items():
  301. converters[camera] = FFMpegConverter(
  302. cam_config.frame_shape[1],
  303. cam_config.frame_shape[0],
  304. cam_config.live.width,
  305. cam_config.live.height,
  306. cam_config.live.quality,
  307. )
  308. broadcasters[camera] = BroadcastThread(
  309. camera, converters[camera], websocket_server
  310. )
  311. if config.birdseye.enabled:
  312. converters["birdseye"] = FFMpegConverter(
  313. config.birdseye.width,
  314. config.birdseye.height,
  315. config.birdseye.width,
  316. config.birdseye.height,
  317. config.birdseye.quality,
  318. )
  319. broadcasters["birdseye"] = BroadcastThread(
  320. "birdseye", converters["birdseye"], websocket_server
  321. )
  322. websocket_thread.start()
  323. for t in broadcasters.values():
  324. t.start()
  325. birdseye_manager = BirdsEyeFrameManager(config, frame_manager)
  326. while not stop_event.is_set():
  327. try:
  328. (
  329. camera,
  330. frame_time,
  331. current_tracked_objects,
  332. motion_boxes,
  333. regions,
  334. ) = video_output_queue.get(True, 10)
  335. except queue.Empty:
  336. continue
  337. frame_id = f"{camera}{frame_time}"
  338. frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
  339. # send camera frame to ffmpeg process if websockets are connected
  340. if any(
  341. ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
  342. ):
  343. # write to the converter for the camera if clients are listening to the specific camera
  344. converters[camera].write(frame.tobytes())
  345. # update birdseye if websockets are connected
  346. if config.birdseye.enabled and any(
  347. ws.environ["PATH_INFO"].endswith("birdseye")
  348. for ws in websocket_server.manager
  349. ):
  350. if birdseye_manager.update(
  351. camera,
  352. len(current_tracked_objects),
  353. len(motion_boxes),
  354. frame_time,
  355. frame,
  356. ):
  357. converters["birdseye"].write(birdseye_manager.frame.tobytes())
  358. if camera in previous_frames:
  359. frame_manager.delete(f"{camera}{previous_frames[camera]}")
  360. previous_frames[camera] = frame_time
  361. while not video_output_queue.empty():
  362. (
  363. camera,
  364. frame_time,
  365. current_tracked_objects,
  366. motion_boxes,
  367. regions,
  368. ) = video_output_queue.get(True, 10)
  369. frame_id = f"{camera}{frame_time}"
  370. frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
  371. frame_manager.delete(frame_id)
  372. for c in converters.values():
  373. c.exit()
  374. for b in broadcasters.values():
  375. b.join()
  376. websocket_server.manager.close_all()
  377. websocket_server.manager.stop()
  378. websocket_server.manager.join()
  379. websocket_server.shutdown()
  380. websocket_thread.join()
  381. logger.info("exiting output process...")