Parcourir la source

add mqtt and update readme

blakeblackshear il y a 6 ans
Parent
commit
ab1fda3a0e
2 fichiers modifiés avec 75 ajouts et 12 suppressions
  1. 12 5
      README.md
  2. 63 7
      detect_objects.py

+ 12 - 5
README.md

@@ -25,7 +25,11 @@ docker run -it --rm \
 -v <path_to_labelmap.pbtext>:/label_map.pbtext:ro \
 -p 5000:5000 \
 -e RTSP_URL='<rtsp_url>' \
--e REGIONS='<box_size_1>,<x_offset_1>,<y_offset_1>:<box_size_2>,<x_offset_2>,<y_offset_2>' \
+-e REGIONS='<box_size_1>,<x_offset_1>,<y_offset_1>,<min_object_size_1>:<box_size_2>,<x_offset_2>,<y_offset_2>,<min_object_size_2>' \
+-e MQTT_HOST='your.mqtthost.com' \
+-e MQTT_MOTION_TOPIC='cameras/1/motion' \
+-e MQTT_OBJECT_TOPIC='cameras/1/objects' \
+-e MQTT_OBJECT_CLASSES='person,car,truck' \
 realtime-od:latest
 ```
 
@@ -36,11 +40,14 @@ Access the mjpeg stream at http://localhost:5000
 - Use SSDLite models
 
 ## Future improvements
-- [ ] Look for a subset of object types
+- [ ] Add a max size for motion and objects
+- [ ] Filter out detected objects that are not the right size
+- [ ] Change color of bounding box if motion detected
+- [x] Look for a subset of object types
 - [ ] Try and simplify the tensorflow model to just look for the objects we care about
-- [ ] MQTT messages when detected objects change
-- [ ] Implement basic motion detection with opencv and only look for objects in the regions with detected motion
-- [ ] Dynamic changes to processing speed, ie. only process 1FPS unless motion detected
+- [x] MQTT messages when detected objects change
+- [x] Implement basic motion detection with opencv and only look for objects in the regions with detected motion
+- [x] Dynamic changes to processing speed, ie. only process 1FPS unless motion detected
 - [x] Parallel processing to increase FPS
 - [ ] Look into GPU accelerated decoding of RTSP stream
 - [ ] Send video over a socket and use JSMPEG

+ 63 - 7
detect_objects.py

@@ -7,12 +7,14 @@ import ctypes
 import logging
 import multiprocessing as mp
 import threading
+import json
 from contextlib import closing
 import numpy as np
 import tensorflow as tf
 from object_detection.utils import label_map_util
 from object_detection.utils import visualization_utils as vis_util
 from flask import Flask, Response, make_response
+import paho.mqtt.client as mqtt
 
 RTSP_URL = os.getenv('RTSP_URL')
 
@@ -22,6 +24,11 @@ PATH_TO_CKPT = '/frozen_inference_graph.pb'
 # List of the strings that is used to add correct label for each box.
 PATH_TO_LABELS = '/label_map.pbtext'
 
+MQTT_HOST = os.getenv('MQTT_HOST')
+MQTT_MOTION_TOPIC = os.getenv('MQTT_MOTION_TOPIC')
+MQTT_OBJECT_TOPIC = os.getenv('MQTT_OBJECT_TOPIC')
+MQTT_OBJECT_CLASSES = os.getenv('MQTT_OBJECT_CLASSES')
+
 # TODO: make dynamic?
 NUM_CLASSES = 90
 
@@ -96,7 +103,50 @@ class ObjectParser(threading.Thread):
                     })
                     object_index += 6
             DETECTED_OBJECTS = detected_objects
-            time.sleep(0.01)
+            time.sleep(0.1)
+class MqttPublisher(threading.Thread):
+    def __init__(self, host, motion_topic, object_topic, object_classes, motion_flags):
+        threading.Thread.__init__(self)
+        self.client = mqtt.Client()
+        self.client.connect(host, 1883, 60)
+        self.client.loop_start()
+        self.motion_topic = motion_topic
+        self.object_topic = object_topic
+        self.object_classes = object_classes
+        self.motion_flags = motion_flags
+
+    def run(self):
+        global DETECTED_OBJECTS
+
+        last_sent_payload = ""
+        last_motion = ""
+        while True:
+            # initialize the payload
+            payload = {}
+            for obj in self.object_classes:
+                payload[obj] = []
+            # loop over detected objects and populate
+            # the payload
+            detected_objects = DETECTED_OBJECTS.copy()
+            for obj in detected_objects:
+                if obj['name'] in self.object_classes:
+                    payload[obj['name']].append(obj)
+            
+            new_payload = json.dumps(payload, sort_keys=True)
+            if new_payload != last_sent_payload:
+                last_sent_payload = new_payload
+                self.client.publish(self.object_topic, new_payload, retain=False)
+
+            motion_status = 'OFF'
+            if any(obj.value == 1 for obj in self.motion_flags):
+                motion_status = 'ON'
+            
+            if motion_status != last_motion:
+                last_motion = motion_status
+                self.client.publish(self.motion_topic, motion_status, retain=False)
+            
+
+            time.sleep(0.1)
 
 def main():
     # Parse selected regions
@@ -173,6 +223,11 @@ def main():
     object_parser = ObjectParser([obj['output_array'] for obj in shared_memory_objects])
     object_parser.start()
 
+    mqtt_publisher = MqttPublisher(MQTT_HOST, MQTT_MOTION_TOPIC, MQTT_OBJECT_TOPIC, 
+        MQTT_OBJECT_CLASSES.split(','), 
+        [obj['motion_detected'] for obj in shared_memory_objects])
+    mqtt_publisher.start()
+
     capture_process.start()
     print("capture_process pid ", capture_process.pid)
     for detection_process in detection_processes:
@@ -237,6 +292,7 @@ def main():
     for motion_process in motion_processes:
         motion_process.join()
     object_parser.join()
+    mqtt_publisher.join()
 
 # convert shared memory array into numpy array
 def tonumpyarray(mp_arr):
@@ -272,7 +328,7 @@ def fetch_frames(shared_arr, shared_frame_time, ready_for_frame_flags, frame_sha
                         flag.value = 0
             else:
                 # sleep a little to reduce CPU usage
-                time.sleep(0.01)
+                time.sleep(0.1)
     
     video.release()
 
@@ -297,7 +353,7 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, shared_moti
         now = datetime.datetime.now().timestamp()
         # if there is no motion detected
         if shared_motion.value == 0:
-            time.sleep(0.01)
+            time.sleep(0.1)
             continue
 
         # if there isnt a new frame ready for processing
@@ -312,7 +368,7 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, shared_moti
                 print("sleeping because no frames have been available in a while")
             else:
                 # rest a little bit to avoid maxing out the CPU
-                time.sleep(0.01)
+                time.sleep(0.1)
             continue
         
         # we got a valid frame, so reset the timer
@@ -321,7 +377,7 @@ def process_frames(shared_arr, shared_output_arr, shared_frame_time, shared_moti
         # if the frame is more than 0.5 second old, ignore it
         if (now - shared_frame_time.value) > 0.5:
             # rest a little bit to avoid maxing out the CPU
-            time.sleep(0.01)
+            time.sleep(0.1)
             continue
         
         # make a copy of the cropped frame
@@ -362,7 +418,7 @@ def detect_motion(shared_arr, shared_frame_time, ready_for_frame, shared_motion,
                 print("sleeping because no frames have been available in a while")
             else:
                 # rest a little bit to avoid maxing out the CPU
-                time.sleep(0.01)
+                time.sleep(0.1)
             if ready_for_frame.value == 0:
                 ready_for_frame.value = 1
             continue
@@ -375,7 +431,7 @@ def detect_motion(shared_arr, shared_frame_time, ready_for_frame, shared_motion,
             # signal that we need a new frame
             ready_for_frame.value = 1
             # rest a little bit to avoid maxing out the CPU
-            time.sleep(0.01)
+            time.sleep(0.1)
             continue
         
         # make a copy of the cropped frame