|
@@ -1,23 +1,26 @@
|
|
|
-import datetime
|
|
|
+import sys
|
|
|
+from typing_extensions import runtime
|
|
|
+
|
|
|
+sys.path.append("/lab/frigate")
|
|
|
+
|
|
|
import json
|
|
|
import logging
|
|
|
import multiprocessing as mp
|
|
|
import os
|
|
|
import subprocess as sp
|
|
|
import sys
|
|
|
-from unittest import TestCase, main
|
|
|
|
|
|
import click
|
|
|
+import csv
|
|
|
import cv2
|
|
|
import numpy as np
|
|
|
|
|
|
-from frigate.config import FRIGATE_CONFIG_SCHEMA, FrigateConfig
|
|
|
+from frigate.config import FrigateConfig
|
|
|
from frigate.edgetpu import LocalObjectDetector
|
|
|
from frigate.motion import MotionDetector
|
|
|
from frigate.object_processing import CameraState
|
|
|
from frigate.objects import ObjectTracker
|
|
|
from frigate.util import (
|
|
|
- DictFrameManager,
|
|
|
EventsPerSecond,
|
|
|
SharedMemoryFrameManager,
|
|
|
draw_box_with_label,
|
|
@@ -96,20 +99,22 @@ class ProcessClip:
|
|
|
ffmpeg_process.wait()
|
|
|
ffmpeg_process.communicate()
|
|
|
|
|
|
- def process_frames(self, objects_to_track=["person"], object_filters={}):
|
|
|
+ def process_frames(
|
|
|
+ self, object_detector, objects_to_track=["person"], object_filters={}
|
|
|
+ ):
|
|
|
mask = np.zeros((self.frame_shape[0], self.frame_shape[1], 1), np.uint8)
|
|
|
mask[:] = 255
|
|
|
- motion_detector = MotionDetector(
|
|
|
- self.frame_shape, mask, self.camera_config.motion
|
|
|
- )
|
|
|
+ motion_detector = MotionDetector(self.frame_shape, self.camera_config.motion)
|
|
|
+ motion_detector.save_images = False
|
|
|
|
|
|
- object_detector = LocalObjectDetector(labels="/labelmap.txt")
|
|
|
object_tracker = ObjectTracker(self.camera_config.detect)
|
|
|
process_info = {
|
|
|
"process_fps": mp.Value("d", 0.0),
|
|
|
"detection_fps": mp.Value("d", 0.0),
|
|
|
"detection_frame": mp.Value("d", 0.0),
|
|
|
}
|
|
|
+
|
|
|
+ detection_enabled = mp.Value("d", 1)
|
|
|
stop_event = mp.Event()
|
|
|
model_shape = (self.config.model.height, self.config.model.width)
|
|
|
|
|
@@ -118,6 +123,7 @@ class ProcessClip:
|
|
|
self.frame_queue,
|
|
|
self.frame_shape,
|
|
|
model_shape,
|
|
|
+ self.camera_config.detect,
|
|
|
self.frame_manager,
|
|
|
motion_detector,
|
|
|
object_detector,
|
|
@@ -126,25 +132,16 @@ class ProcessClip:
|
|
|
process_info,
|
|
|
objects_to_track,
|
|
|
object_filters,
|
|
|
- mask,
|
|
|
+ detection_enabled,
|
|
|
stop_event,
|
|
|
exit_on_empty=True,
|
|
|
)
|
|
|
|
|
|
- def top_object(self, debug_path=None):
|
|
|
- obj_detected = False
|
|
|
- top_computed_score = 0.0
|
|
|
-
|
|
|
- def handle_event(name, obj, frame_time):
|
|
|
- nonlocal obj_detected
|
|
|
- nonlocal top_computed_score
|
|
|
- if obj.computed_score > top_computed_score:
|
|
|
- top_computed_score = obj.computed_score
|
|
|
- if not obj.false_positive:
|
|
|
- obj_detected = True
|
|
|
-
|
|
|
- self.camera_state.on("new", handle_event)
|
|
|
- self.camera_state.on("update", handle_event)
|
|
|
+ def stats(self, debug_path=None):
|
|
|
+ total_regions = 0
|
|
|
+ total_motion_boxes = 0
|
|
|
+ object_ids = set()
|
|
|
+ total_frames = 0
|
|
|
|
|
|
while not self.detected_objects_queue.empty():
|
|
|
(
|
|
@@ -154,7 +151,8 @@ class ProcessClip:
|
|
|
motion_boxes,
|
|
|
regions,
|
|
|
) = self.detected_objects_queue.get()
|
|
|
- if not debug_path is None:
|
|
|
+
|
|
|
+ if debug_path:
|
|
|
self.save_debug_frame(
|
|
|
debug_path, frame_time, current_tracked_objects.values()
|
|
|
)
|
|
@@ -162,10 +160,22 @@ class ProcessClip:
|
|
|
self.camera_state.update(
|
|
|
frame_time, current_tracked_objects, motion_boxes, regions
|
|
|
)
|
|
|
+ total_regions += len(regions)
|
|
|
+ total_motion_boxes += len(motion_boxes)
|
|
|
+ for id, obj in self.camera_state.tracked_objects.items():
|
|
|
+ if not obj.false_positive:
|
|
|
+ object_ids.add(id)
|
|
|
|
|
|
- self.frame_manager.delete(self.camera_state.previous_frame_id)
|
|
|
+ total_frames += 1
|
|
|
|
|
|
- return {"object_detected": obj_detected, "top_score": top_computed_score}
|
|
|
+ self.frame_manager.delete(self.camera_state.previous_frame_id)
|
|
|
+
|
|
|
+ return {
|
|
|
+ "total_regions": total_regions,
|
|
|
+ "total_motion_boxes": total_motion_boxes,
|
|
|
+ "true_positive_objects": len(object_ids),
|
|
|
+ "total_frames": total_frames,
|
|
|
+ }
|
|
|
|
|
|
def save_debug_frame(self, debug_path, frame_time, tracked_objects):
|
|
|
current_frame = cv2.cvtColor(
|
|
@@ -178,7 +188,6 @@ class ProcessClip:
|
|
|
for obj in tracked_objects:
|
|
|
thickness = 2
|
|
|
color = (0, 0, 175)
|
|
|
-
|
|
|
if obj["frame_time"] != frame_time:
|
|
|
thickness = 1
|
|
|
color = (255, 0, 0)
|
|
@@ -221,10 +230,9 @@ class ProcessClip:
|
|
|
@click.command()
|
|
|
@click.option("-p", "--path", required=True, help="Path to clip or directory to test.")
|
|
|
@click.option("-l", "--label", default="person", help="Label name to detect.")
|
|
|
-@click.option("-t", "--threshold", default=0.85, help="Threshold value for objects.")
|
|
|
-@click.option("-s", "--scores", default=None, help="File to save csv of top scores")
|
|
|
+@click.option("-o", "--output", default=None, help="File to save csv of data")
|
|
|
@click.option("--debug-path", default=None, help="Path to output frames for debugging.")
|
|
|
-def process(path, label, threshold, scores, debug_path):
|
|
|
+def process(path, label, output, debug_path):
|
|
|
clips = []
|
|
|
if os.path.isdir(path):
|
|
|
files = os.listdir(path)
|
|
@@ -235,51 +243,74 @@ def process(path, label, threshold, scores, debug_path):
|
|
|
|
|
|
json_config = {
|
|
|
"mqtt": {"host": "mqtt"},
|
|
|
+ "detectors": {"coral": {"type": "edgetpu", "device": "usb"}},
|
|
|
"cameras": {
|
|
|
"camera": {
|
|
|
"ffmpeg": {
|
|
|
"inputs": [
|
|
|
{
|
|
|
"path": "path.mp4",
|
|
|
- "global_args": "",
|
|
|
- "input_args": "",
|
|
|
+ "global_args": "-hide_banner",
|
|
|
+ "input_args": "-loglevel info",
|
|
|
"roles": ["detect"],
|
|
|
}
|
|
|
]
|
|
|
},
|
|
|
- "height": 1920,
|
|
|
- "width": 1080,
|
|
|
+ "rtmp": {"enabled": False},
|
|
|
+ "record": {"enabled": False},
|
|
|
}
|
|
|
},
|
|
|
}
|
|
|
|
|
|
+ object_detector = LocalObjectDetector(labels="/labelmap.txt")
|
|
|
+
|
|
|
results = []
|
|
|
for c in clips:
|
|
|
logger.info(c)
|
|
|
frame_shape = get_frame_shape(c)
|
|
|
|
|
|
- json_config["cameras"]["camera"]["height"] = frame_shape[0]
|
|
|
- json_config["cameras"]["camera"]["width"] = frame_shape[1]
|
|
|
json_config["cameras"]["camera"]["ffmpeg"]["inputs"][0]["path"] = c
|
|
|
|
|
|
- config = FrigateConfig(config=FRIGATE_CONFIG_SCHEMA(json_config))
|
|
|
+ frigate_config = FrigateConfig(**json_config)
|
|
|
+ runtime_config = frigate_config.runtime_config
|
|
|
|
|
|
- process_clip = ProcessClip(c, frame_shape, config)
|
|
|
+ process_clip = ProcessClip(c, frame_shape, runtime_config)
|
|
|
process_clip.load_frames()
|
|
|
- process_clip.process_frames(objects_to_track=[label])
|
|
|
-
|
|
|
- results.append((c, process_clip.top_object(debug_path)))
|
|
|
+ process_clip.process_frames(object_detector, objects_to_track=[label])
|
|
|
|
|
|
- if not scores is None:
|
|
|
- with open(scores, "w") as writer:
|
|
|
- for result in results:
|
|
|
- writer.write(f"{result[0]},{result[1]['top_score']}\n")
|
|
|
+ results.append((c, process_clip.stats(debug_path)))
|
|
|
|
|
|
- positive_count = sum(1 for result in results if result[1]["object_detected"])
|
|
|
+ positive_count = sum(
|
|
|
+ 1 for result in results if result[1]["true_positive_objects"] > 0
|
|
|
+ )
|
|
|
print(
|
|
|
f"Objects were detected in {positive_count}/{len(results)}({positive_count/len(results)*100:.2f}%) clip(s)."
|
|
|
)
|
|
|
|
|
|
+ if output:
|
|
|
+ # now we will open a file for writing
|
|
|
+ data_file = open(output, "w")
|
|
|
+
|
|
|
+ # create the csv writer object
|
|
|
+ csv_writer = csv.writer(data_file)
|
|
|
+
|
|
|
+ # Counter variable used for writing
|
|
|
+ # headers to the CSV file
|
|
|
+ count = 0
|
|
|
+
|
|
|
+ for result in results:
|
|
|
+ if count == 0:
|
|
|
+
|
|
|
+ # Writing headers of CSV file
|
|
|
+ header = ["file"] + list(result[1].keys())
|
|
|
+ csv_writer.writerow(header)
|
|
|
+ count += 1
|
|
|
+
|
|
|
+ # Writing data of CSV file
|
|
|
+ csv_writer.writerow([result[0]] + list(result[1].values()))
|
|
|
+
|
|
|
+ data_file.close()
|
|
|
+
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
process()
|