output.py 15 KB


  1. import datetime
  2. import logging
  3. import math
  4. import multiprocessing as mp
  5. import queue
  6. import signal
  7. import subprocess as sp
  8. import threading
  9. from multiprocessing import shared_memory
  10. from wsgiref.simple_server import make_server
  11. import cv2
  12. import numpy as np
  13. from setproctitle import setproctitle
  14. from ws4py.server.wsgirefserver import (
  15. WebSocketWSGIHandler,
  16. WebSocketWSGIRequestHandler,
  17. WSGIServer,
  18. )
  19. from ws4py.server.wsgiutils import WebSocketWSGIApplication
  20. from ws4py.websocket import WebSocket
  21. from frigate.config import FrigateConfig
  22. from frigate.util import SharedMemoryFrameManager, copy_yuv_to_position, get_yuv_crop
  23. logger = logging.getLogger(__name__)
  24. class FFMpegConverter:
  25. def __init__(self, in_width, in_height, out_width, out_height, quality):
  26. ffmpeg_cmd = f"ffmpeg -f rawvideo -pix_fmt yuv420p -video_size {in_width}x{in_height} -i pipe: -f mpegts -s {out_width}x{out_height} -codec:v mpeg1video -q {quality} -bf 0 pipe:".split(
  27. " "
  28. )
  29. self.process = sp.Popen(
  30. ffmpeg_cmd,
  31. stdout=sp.PIPE,
  32. stderr=sp.DEVNULL,
  33. stdin=sp.PIPE,
  34. start_new_session=True,
  35. )
  36. def write(self, b):
  37. self.process.stdin.write(b)
  38. def read(self, length):
  39. try:
  40. return self.process.stdout.read1(length)
  41. except ValueError:
  42. return False
  43. def exit(self):
  44. self.process.terminate()
  45. try:
  46. self.process.communicate(timeout=30)
  47. except sp.TimeoutExpired:
  48. self.process.kill()
  49. self.process.communicate()
  50. class BroadcastThread(threading.Thread):
  51. def __init__(self, camera, converter, websocket_server):
  52. super(BroadcastThread, self).__init__()
  53. self.camera = camera
  54. self.converter = converter
  55. self.websocket_server = websocket_server
  56. def run(self):
  57. while True:
  58. buf = self.converter.read(65536)
  59. if buf:
  60. manager = self.websocket_server.manager
  61. with manager.lock:
  62. websockets = manager.websockets.copy()
  63. ws_iter = iter(websockets.values())
  64. for ws in ws_iter:
  65. if not ws.terminated and ws.environ["PATH_INFO"].endswith(
  66. self.camera
  67. ):
  68. try:
  69. ws.send(buf, binary=True)
  70. except:
  71. pass
  72. elif self.converter.process.poll() is not None:
  73. break
  74. class BirdsEyeFrameManager:
  75. def __init__(self, config, frame_manager: SharedMemoryFrameManager):
  76. self.config = config
  77. self.mode = config.birdseye.mode
  78. self.frame_manager = frame_manager
  79. width = config.birdseye.width
  80. height = config.birdseye.height
  81. self.frame_shape = (height, width)
  82. self.yuv_shape = (height * 3 // 2, width)
  83. self.frame = np.ndarray(self.yuv_shape, dtype=np.uint8)
  84. # initialize the frame as black and with the frigate logo
  85. self.blank_frame = np.zeros(self.yuv_shape, np.uint8)
  86. self.blank_frame[:] = 128
  87. self.blank_frame[0 : self.frame_shape[0], 0 : self.frame_shape[1]] = 16
  88. self.frame[:] = self.blank_frame
  89. self.cameras = {}
  90. for camera, settings in self.config.cameras.items():
  91. # precalculate the coordinates for all the channels
  92. y, u1, u2, v1, v2 = get_yuv_crop(
  93. settings.frame_shape_yuv,
  94. (
  95. 0,
  96. 0,
  97. settings.frame_shape[1],
  98. settings.frame_shape[0],
  99. ),
  100. )
  101. self.cameras[camera] = {
  102. "last_active_frame": 0.0,
  103. "current_frame": 0.0,
  104. "layout_frame": 0.0,
  105. "channel_dims": {
  106. "y": y,
  107. "u1": u1,
  108. "u2": u2,
  109. "v1": v1,
  110. "v2": v2,
  111. },
  112. }
  113. self.camera_layout = []
  114. self.active_cameras = set()
  115. self.layout_dim = 0
  116. self.last_output_time = 0.0
  117. def clear_frame(self):
  118. logger.debug(f"Clearing the birdseye frame")
  119. self.frame[:] = self.blank_frame
  120. def copy_to_position(self, position, camera=None, frame_time=None):
  121. if camera is None:
  122. frame = None
  123. channel_dims = None
  124. else:
  125. frame = self.frame_manager.get(
  126. f"{camera}{frame_time}", self.config.cameras[camera].frame_shape_yuv
  127. )
  128. channel_dims = self.cameras[camera]["channel_dims"]
  129. copy_yuv_to_position(
  130. self.frame,
  131. self.layout_offsets[position],
  132. self.layout_frame_shape,
  133. frame,
  134. channel_dims,
  135. )
  136. def camera_active(self, object_box_count, motion_box_count):
  137. if self.mode == "continuous":
  138. return True
  139. if self.mode == "motion" and object_box_count + motion_box_count > 0:
  140. return True
  141. if self.mode == "objects" and object_box_count > 0:
  142. return True
  143. def update_frame(self):
  144. # determine how many cameras are tracking objects within the last 30 seconds
  145. active_cameras = set(
  146. [
  147. cam
  148. for cam, cam_data in self.cameras.items()
  149. if cam_data["last_active_frame"] > 0
  150. and cam_data["current_frame"] - cam_data["last_active_frame"] < 30
  151. ]
  152. )
  153. logger.debug(f"Active cameras: {active_cameras}")
  154. # if there are no active cameras
  155. if len(active_cameras) == 0:
  156. # if the layout is already cleared
  157. if len(self.camera_layout) == 0:
  158. return False
  159. # if the layout needs to be cleared
  160. else:
  161. self.camera_layout = []
  162. self.clear_frame()
  163. return True
  164. # calculate layout dimensions
  165. layout_dim = math.ceil(math.sqrt(len(active_cameras)))
  166. logger.debug(f"New calculated layout dimensions: {layout_dim}")
  167. # reset the layout if it needs to be different
  168. if layout_dim != self.layout_dim:
  169. logger.debug(f"Changing layout size from {self.layout_dim} to {layout_dim}")
  170. self.layout_dim = layout_dim
  171. self.camera_layout = [None] * layout_dim * layout_dim
  172. # calculate resolution of each position in the layout
  173. self.layout_frame_shape = (
  174. self.frame_shape[0] // layout_dim, # height
  175. self.frame_shape[1] // layout_dim, # width
  176. )
  177. self.clear_frame()
  178. for cam_data in self.cameras.values():
  179. cam_data["layout_frame"] = 0.0
  180. self.active_cameras = set()
  181. self.layout_offsets = []
  182. # calculate the x and y offset for each position in the layout
  183. for position in range(0, len(self.camera_layout)):
  184. y_offset = self.layout_frame_shape[0] * math.floor(
  185. position / self.layout_dim
  186. )
  187. x_offset = self.layout_frame_shape[1] * (position % self.layout_dim)
  188. self.layout_offsets.append((y_offset, x_offset))
  189. removed_cameras = self.active_cameras.difference(active_cameras)
  190. added_cameras = active_cameras.difference(self.active_cameras)
  191. self.active_cameras = active_cameras
  192. # update each position in the layout
  193. for position, camera in enumerate(self.camera_layout, start=0):
  194. # if this camera was removed, replace it or clear it
  195. if camera in removed_cameras:
  196. # if replacing this camera with a newly added one
  197. if len(added_cameras) > 0:
  198. added_camera = added_cameras.pop()
  199. self.camera_layout[position] = added_camera
  200. self.copy_to_position(
  201. position,
  202. added_camera,
  203. self.cameras[added_camera]["current_frame"],
  204. )
  205. self.cameras[added_camera]["layout_frame"] = self.cameras[
  206. added_camera
  207. ]["current_frame"]
  208. # if removing this camera with no replacement
  209. else:
  210. self.camera_layout[position] = None
  211. self.copy_to_position(position)
  212. removed_cameras.remove(camera)
  213. # if an empty spot and there are cameras to add
  214. elif camera is None and len(added_cameras) > 0:
  215. added_camera = added_cameras.pop()
  216. self.camera_layout[position] = added_camera
  217. self.copy_to_position(
  218. position,
  219. added_camera,
  220. self.cameras[added_camera]["current_frame"],
  221. )
  222. self.cameras[added_camera]["layout_frame"] = self.cameras[added_camera][
  223. "current_frame"
  224. ]
  225. # if not an empty spot and the camera has a newer frame, copy it
  226. elif (
  227. not camera is None
  228. and self.cameras[camera]["current_frame"]
  229. != self.cameras[camera]["layout_frame"]
  230. ):
  231. self.copy_to_position(
  232. position, camera, self.cameras[camera]["current_frame"]
  233. )
  234. self.cameras[camera]["layout_frame"] = self.cameras[camera][
  235. "current_frame"
  236. ]
  237. return True
  238. def update(self, camera, object_count, motion_count, frame_time, frame) -> bool:
  239. # update the last active frame for the camera
  240. self.cameras[camera]["current_frame"] = frame_time
  241. if self.camera_active(object_count, motion_count):
  242. last_active_frame = self.cameras[camera]["last_active_frame"]
  243. # cleanup the old frame
  244. if last_active_frame != 0.0:
  245. frame_id = f"{camera}{last_active_frame}"
  246. self.frame_manager.delete(frame_id)
  247. self.cameras[camera]["last_active_frame"] = frame_time
  248. now = datetime.datetime.now().timestamp()
  249. # limit output to 10 fps
  250. if (now - self.last_output_time) < 1 / 10:
  251. return False
  252. # if the frame was updated or the fps is too low, send frame
  253. if self.update_frame() or (now - self.last_output_time) > 1:
  254. self.last_output_time = now
  255. return True
  256. return False
  257. def output_frames(config: FrigateConfig, video_output_queue):
  258. threading.current_thread().name = f"output"
  259. setproctitle(f"frigate.output")
  260. stop_event = mp.Event()
  261. def receiveSignal(signalNumber, frame):
  262. stop_event.set()
  263. signal.signal(signal.SIGTERM, receiveSignal)
  264. signal.signal(signal.SIGINT, receiveSignal)
  265. frame_manager = SharedMemoryFrameManager()
  266. previous_frames = {}
  267. # start a websocket server on 8082
  268. WebSocketWSGIHandler.http_version = "1.1"
  269. websocket_server = make_server(
  270. "127.0.0.1",
  271. 8082,
  272. server_class=WSGIServer,
  273. handler_class=WebSocketWSGIRequestHandler,
  274. app=WebSocketWSGIApplication(handler_cls=WebSocket),
  275. )
  276. websocket_server.initialize_websockets_manager()
  277. websocket_thread = threading.Thread(target=websocket_server.serve_forever)
  278. converters = {}
  279. broadcasters = {}
  280. for camera, cam_config in config.cameras.items():
  281. converters[camera] = FFMpegConverter(
  282. cam_config.frame_shape[1],
  283. cam_config.frame_shape[0],
  284. cam_config.frame_shape[1],
  285. cam_config.frame_shape[0],
  286. 8,
  287. )
  288. broadcasters[camera] = BroadcastThread(
  289. camera, converters[camera], websocket_server
  290. )
  291. if config.birdseye.enabled:
  292. converters["birdseye"] = FFMpegConverter(
  293. config.birdseye.width,
  294. config.birdseye.height,
  295. config.birdseye.width,
  296. config.birdseye.height,
  297. config.birdseye.quality,
  298. )
  299. broadcasters["birdseye"] = BroadcastThread(
  300. "birdseye", converters["birdseye"], websocket_server
  301. )
  302. websocket_thread.start()
  303. for t in broadcasters.values():
  304. t.start()
  305. birdseye_manager = BirdsEyeFrameManager(config, frame_manager)
  306. while not stop_event.is_set():
  307. try:
  308. (
  309. camera,
  310. frame_time,
  311. current_tracked_objects,
  312. motion_boxes,
  313. regions,
  314. ) = video_output_queue.get(True, 10)
  315. except queue.Empty:
  316. continue
  317. frame_id = f"{camera}{frame_time}"
  318. frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
  319. # send camera frame to ffmpeg process if websockets are connected
  320. if any(
  321. ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
  322. ):
  323. # write to the converter for the camera if clients are listening to the specific camera
  324. converters[camera].write(frame.tobytes())
  325. # update birdseye if websockets are connected
  326. if config.birdseye.enabled and any(
  327. ws.environ["PATH_INFO"].endswith("birdseye")
  328. for ws in websocket_server.manager
  329. ):
  330. if birdseye_manager.update(
  331. camera,
  332. len(current_tracked_objects),
  333. len(motion_boxes),
  334. frame_time,
  335. frame,
  336. ):
  337. converters["birdseye"].write(birdseye_manager.frame.tobytes())
  338. if camera in previous_frames:
  339. # if the birdseye manager still needs this frame, don't delete it
  340. if (
  341. birdseye_manager.cameras[camera]["last_active_frame"]
  342. != previous_frames[camera]
  343. ):
  344. frame_manager.delete(f"{camera}{previous_frames[camera]}")
  345. previous_frames[camera] = frame_time
  346. while not video_output_queue.empty():
  347. (
  348. camera,
  349. frame_time,
  350. current_tracked_objects,
  351. motion_boxes,
  352. regions,
  353. ) = video_output_queue.get(True, 10)
  354. frame_id = f"{camera}{frame_time}"
  355. frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
  356. frame_manager.delete(frame_id)
  357. for c in converters.values():
  358. c.exit()
  359. for b in broadcasters.values():
  360. b.join()
  361. websocket_server.manager.close_all()
  362. websocket_server.manager.stop()
  363. websocket_server.manager.join()
  364. websocket_server.shutdown()
  365. websocket_thread.join()
  366. logger.info("exiting output process...")