Compare commits

..

20 Commits

Author SHA1 Message Date
Blake Blackshear
128be72e28 cleanup 2020-02-22 09:15:29 -06:00
Blake Blackshear
aaddedc95c update docs and add back benchmark 2020-02-22 09:10:37 -06:00
Blake Blackshear
ba919fb439 fix watchdog 2020-02-22 09:10:37 -06:00
Blake Blackshear
b1d563f3c4 check avg wait before dropping frames 2020-02-22 09:10:37 -06:00
Blake Blackshear
204d8af5df fix watchdog restart 2020-02-22 09:10:37 -06:00
Blake Blackshear
b507a73d79 improve watchdog and coral fps tracking 2020-02-22 09:10:37 -06:00
Blake Blackshear
66eeb8b5cb dont log http requests 2020-02-22 09:10:37 -06:00
Blake Blackshear
efa67067c6 cleanup 2020-02-22 09:10:37 -06:00
Blake Blackshear
aeb036f1a4 add models and convert speed to ms 2020-02-22 09:10:37 -06:00
Blake Blackshear
74c528f9dc add watchdog for camera processes 2020-02-22 09:10:34 -06:00
Blake Blackshear
f2d54bec43 cleanup old code 2020-02-22 09:09:36 -06:00
Blake Blackshear
f07d57741e add a min_fps option 2020-02-22 09:06:46 -06:00
Blake Blackshear
2c1ec19f98 check plasma store and consolidate frame drawing 2020-02-22 09:06:46 -06:00
Blake Blackshear
6a9027c002 split into separate processes 2020-02-22 09:06:43 -06:00
Blake Blackshear
60c15e4419 update tflite to 2.1.0 2020-02-22 09:05:26 -06:00
Blake Blackshear
03dbf600aa refactor some classes into new files 2020-02-22 09:05:26 -06:00
Blake Blackshear
fbbb79b31b tweak process handoff 2020-02-22 09:05:26 -06:00
Blake Blackshear
496c6bc6c4 Mostly working detection in a separate process 2020-02-22 09:05:26 -06:00
Blake Blackshear
869a81c944 read from ffmpeg 2020-02-22 09:05:26 -06:00
Blake Blackshear
5b1884cfb3 WIP: revamp to incorporate motion 2020-02-22 09:05:26 -06:00
10 changed files with 263 additions and 511 deletions

View File

@@ -25,6 +25,7 @@ RUN apt -qq update && apt -qq install --no-install-recommends -y \
imutils \ imutils \
scipy \ scipy \
&& python3.7 -m pip install -U \ && python3.7 -m pip install -U \
SharedArray \
Flask \ Flask \
paho-mqtt \ paho-mqtt \
PyYAML \ PyYAML \
@@ -37,9 +38,9 @@ RUN apt -qq update && apt -qq install --no-install-recommends -y \
&& apt -qq install --no-install-recommends -y \ && apt -qq install --no-install-recommends -y \
libedgetpu1-max \ libedgetpu1-max \
## Tensorflow lite (python 3.7 only) ## Tensorflow lite (python 3.7 only)
&& wget -q https://dl.google.com/coral/python/tflite_runtime-2.1.0.post1-cp37-cp37m-linux_x86_64.whl \ && wget -q https://dl.google.com/coral/python/tflite_runtime-2.1.0-cp37-cp37m-linux_x86_64.whl \
&& python3.7 -m pip install tflite_runtime-2.1.0.post1-cp37-cp37m-linux_x86_64.whl \ && python3.7 -m pip install tflite_runtime-2.1.0-cp37-cp37m-linux_x86_64.whl \
&& rm tflite_runtime-2.1.0.post1-cp37-cp37m-linux_x86_64.whl \ && rm tflite_runtime-2.1.0-cp37-cp37m-linux_x86_64.whl \
&& rm -rf /var/lib/apt/lists/* \ && rm -rf /var/lib/apt/lists/* \
&& (apt-get autoremove -y; apt-get autoclean -y) && (apt-get autoremove -y; apt-get autoclean -y)

View File

@@ -16,6 +16,16 @@ You see multiple bounding boxes because it draws bounding boxes from all frames
[![](http://img.youtube.com/vi/nqHbCtyo4dY/0.jpg)](http://www.youtube.com/watch?v=nqHbCtyo4dY "Frigate") [![](http://img.youtube.com/vi/nqHbCtyo4dY/0.jpg)](http://www.youtube.com/watch?v=nqHbCtyo4dY "Frigate")
## Getting Started ## Getting Started
Build the container with
```
docker build -t frigate .
```
Models for both CPU and EdgeTPU (Coral) are bundled in the image. You can use your own models with volume mounts:
- CPU Model: `/cpu_model.tflite`
- EdgeTPU Model: `/edgetpu_model.tflite`
- Labels: `/labelmap.txt`
Run the container with Run the container with
```bash ```bash
docker run --rm \ docker run --rm \
@@ -26,7 +36,7 @@ docker run --rm \
-v /etc/localtime:/etc/localtime:ro \ -v /etc/localtime:/etc/localtime:ro \
-p 5000:5000 \ -p 5000:5000 \
-e FRIGATE_RTSP_PASSWORD='password' \ -e FRIGATE_RTSP_PASSWORD='password' \
blakeblackshear/frigate:stable frigate:latest
``` ```
Example docker-compose: Example docker-compose:
@@ -36,7 +46,7 @@ Example docker-compose:
restart: unless-stopped restart: unless-stopped
privileged: true privileged: true
shm_size: '1g' # should work for 5-7 cameras shm_size: '1g' # should work for 5-7 cameras
image: blakeblackshear/frigate:stable image: frigate:latest
volumes: volumes:
- /dev/bus/usb:/dev/bus/usb - /dev/bus/usb:/dev/bus/usb
- /etc/localtime:/etc/localtime:ro - /etc/localtime:/etc/localtime:ro
@@ -117,11 +127,6 @@ sensor:
value_template: '{{ states.sensor.frigate_debug.attributes["coral"]["inference_speed"] }}' value_template: '{{ states.sensor.frigate_debug.attributes["coral"]["inference_speed"] }}'
unit_of_measurement: 'ms' unit_of_measurement: 'ms'
``` ```
## Using a custom model
Models for both CPU and EdgeTPU (Coral) are bundled in the image. You can use your own models with volume mounts:
- CPU Model: `/cpu_model.tflite`
- EdgeTPU Model: `/edgetpu_model.tflite`
- Labels: `/labelmap.txt`
## Tips ## Tips
- Lower the framerate of the video feed on the camera to reduce the CPU usage for capturing the feed - Lower the framerate of the video feed on the camera to reduce the CPU usage for capturing the feed

View File

@@ -1,79 +1,18 @@
import os import statistics
from statistics import mean
import multiprocessing as mp
import numpy as np import numpy as np
import datetime import time
from frigate.edgetpu import ObjectDetector, EdgeTPUProcess, RemoteObjectDetector, load_labels from frigate.edgetpu import ObjectDetector
my_frame = np.expand_dims(np.full((300,300,3), 1, np.uint8), axis=0) object_detector = ObjectDetector()
labels = load_labels('/labelmap.txt')
###### frame = np.zeros((300,300,3), np.uint8)
# Minimal same process runner input_frame = np.expand_dims(frame, axis=0)
######
# object_detector = ObjectDetector()
# tensor_input = np.expand_dims(np.full((300,300,3), 0, np.uint8), axis=0)
# start = datetime.datetime.now().timestamp() detection_times = []
# frame_times = [] for x in range(0, 100):
# for x in range(0, 1000): start = time.monotonic()
# start_frame = datetime.datetime.now().timestamp() object_detector.detect_raw(input_frame)
detection_times.append(time.monotonic()-start)
# tensor_input[:] = my_frame print(f"Average inference time: {statistics.mean(detection_times)*1000:.2f}ms")
# detections = object_detector.detect_raw(tensor_input)
# parsed_detections = []
# for d in detections:
# if d[1] < 0.4:
# break
# parsed_detections.append((
# labels[int(d[0])],
# float(d[1]),
# (d[2], d[3], d[4], d[5])
# ))
# frame_times.append(datetime.datetime.now().timestamp()-start_frame)
# duration = datetime.datetime.now().timestamp()-start
# print(f"Processed for {duration:.2f} seconds.")
# print(f"Average frame processing time: {mean(frame_times)*1000:.2f}ms")
######
# Separate process runner
######
def start(id, num_detections, detection_queue):
object_detector = RemoteObjectDetector(str(id), '/labelmap.txt', detection_queue)
start = datetime.datetime.now().timestamp()
frame_times = []
for x in range(0, num_detections):
start_frame = datetime.datetime.now().timestamp()
detections = object_detector.detect(my_frame)
frame_times.append(datetime.datetime.now().timestamp()-start_frame)
duration = datetime.datetime.now().timestamp()-start
print(f"{id} - Processed for {duration:.2f} seconds.")
print(f"{id} - Average frame processing time: {mean(frame_times)*1000:.2f}ms")
edgetpu_process = EdgeTPUProcess()
# start(1, 1000, edgetpu_process.detect_lock, edgetpu_process.detect_ready, edgetpu_process.frame_ready)
####
# Multiple camera processes
####
camera_processes = []
for x in range(0, 10):
camera_process = mp.Process(target=start, args=(x, 100, edgetpu_process.detection_queue))
camera_process.daemon = True
camera_processes.append(camera_process)
start = datetime.datetime.now().timestamp()
for p in camera_processes:
p.start()
for p in camera_processes:
p.join()
duration = datetime.datetime.now().timestamp()-start
print(f"Total - Processed for {duration:.2f} seconds.")

View File

@@ -3,13 +3,9 @@ web_port: 5000
mqtt: mqtt:
host: mqtt.server.com host: mqtt.server.com
topic_prefix: frigate topic_prefix: frigate
# client_id: frigate # Optional -- set to override default client id of 'frigate' if running multiple instances # client_id: frigate # Optional -- set to override default client id of 'frigate' if running multiple instances
# user: username # Optional # user: username # Optional -- Uncomment for use
################# # password: password # Optional -- Uncomment for use
## Environment variables that begin with 'FRIGATE_' may be referenced in {}.
## password: '{FRIGATE_MQTT_PASSWORD}'
#################
# password: password # Optional
################# #################
# Default ffmpeg args. Optional and can be overwritten per camera. # Default ffmpeg args. Optional and can be overwritten per camera.
@@ -110,6 +106,13 @@ cameras:
################ ################
take_frame: 1 take_frame: 1
################
# The expected framerate for the camera. Frigate will try and ensure it maintains this framerate
# by dropping frames as necessary. Setting this lower than the actual framerate will allow frigate
# to process every frame at the expense of realtime processing.
################
fps: 5
################ ################
# Configuration for the snapshots in the debug view and mqtt # Configuration for the snapshots in the debug view and mqtt
################ ################

View File

@@ -1,7 +1,3 @@
import os
import sys
import traceback
import signal
import cv2 import cv2
import time import time
import datetime import datetime
@@ -12,16 +8,14 @@ import multiprocessing as mp
import subprocess as sp import subprocess as sp
import numpy as np import numpy as np
import logging import logging
from flask import Flask, Response, make_response, jsonify, request from flask import Flask, Response, make_response, jsonify
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from frigate.video import track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg from frigate.video import track_camera
from frigate.object_processing import TrackedObjectProcessor from frigate.object_processing import TrackedObjectProcessor
from frigate.util import EventsPerSecond from frigate.util import EventsPerSecond
from frigate.edgetpu import EdgeTPUProcess from frigate.edgetpu import EdgeTPUProcess
FRIGATE_VARS = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
with open('/config/config.yml') as f: with open('/config/config.yml') as f:
CONFIG = yaml.safe_load(f) CONFIG = yaml.safe_load(f)
@@ -30,8 +24,6 @@ MQTT_PORT = CONFIG.get('mqtt', {}).get('port', 1883)
MQTT_TOPIC_PREFIX = CONFIG.get('mqtt', {}).get('topic_prefix', 'frigate') MQTT_TOPIC_PREFIX = CONFIG.get('mqtt', {}).get('topic_prefix', 'frigate')
MQTT_USER = CONFIG.get('mqtt', {}).get('user') MQTT_USER = CONFIG.get('mqtt', {}).get('user')
MQTT_PASS = CONFIG.get('mqtt', {}).get('password') MQTT_PASS = CONFIG.get('mqtt', {}).get('password')
if not MQTT_PASS is None:
MQTT_PASS = MQTT_PASS.format(**FRIGATE_VARS)
MQTT_CLIENT_ID = CONFIG.get('mqtt', {}).get('client_id', 'frigate') MQTT_CLIENT_ID = CONFIG.get('mqtt', {}).get('client_id', 'frigate')
# Set the default FFmpeg config # Set the default FFmpeg config
@@ -39,7 +31,7 @@ FFMPEG_CONFIG = CONFIG.get('ffmpeg', {})
FFMPEG_DEFAULT_CONFIG = { FFMPEG_DEFAULT_CONFIG = {
'global_args': FFMPEG_CONFIG.get('global_args', 'global_args': FFMPEG_CONFIG.get('global_args',
['-hide_banner','-loglevel','panic']), ['-hide_banner','-loglevel','panic']),
'hwaccel_args': FFMPEG_CONFIG.get('hwaccel_args', 'hwaccel_args': FFMPEG_CONFIG.get('hwaccel_args',
[]), []),
'input_args': FFMPEG_CONFIG.get('input_args', 'input_args': FFMPEG_CONFIG.get('input_args',
['-avoid_negative_ts', 'make_zero', ['-avoid_negative_ts', 'make_zero',
@@ -61,72 +53,44 @@ GLOBAL_OBJECT_CONFIG = CONFIG.get('objects', {})
WEB_PORT = CONFIG.get('web_port', 5000) WEB_PORT = CONFIG.get('web_port', 5000)
DEBUG = (CONFIG.get('debug', '0') == '1') DEBUG = (CONFIG.get('debug', '0') == '1')
def start_plasma_store():
plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma']
plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
time.sleep(1)
rc = plasma_process.poll()
if rc is not None:
return None
return plasma_process
class CameraWatchdog(threading.Thread): class CameraWatchdog(threading.Thread):
def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, plasma_process): def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, object_processor):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.camera_processes = camera_processes self.camera_processes = camera_processes
self.config = config self.config = config
self.tflite_process = tflite_process self.tflite_process = tflite_process
self.tracked_objects_queue = tracked_objects_queue self.tracked_objects_queue = tracked_objects_queue
self.plasma_process = plasma_process self.object_processor = object_processor
def run(self): def run(self):
time.sleep(10) time.sleep(10)
while True: while True:
# wait a bit before checking # wait a bit before checking
time.sleep(10) time.sleep(10)
# check the plasma process
rc = self.plasma_process.poll()
if rc != None:
print(f"plasma_process exited unexpectedly with {rc}")
self.plasma_process = start_plasma_store()
# check the detection process
detection_start = self.tflite_process.detection_start.value
if (detection_start > 0.0 and
datetime.datetime.now().timestamp() - detection_start > 10):
print("Detection appears to be stuck. Restarting detection process")
self.tflite_process.start_or_restart()
elif not self.tflite_process.detect_process.is_alive():
print("Detection appears to have stopped. Restarting detection process")
self.tflite_process.start_or_restart()
# check the camera processes
for name, camera_process in self.camera_processes.items(): for name, camera_process in self.camera_processes.items():
process = camera_process['process'] process = camera_process['process']
if (datetime.datetime.now().timestamp() - self.object_processor.get_current_frame_time(name)) > 30:
print(f"Last frame for {name} is more than 30 seconds old...")
if process.is_alive():
process.terminate()
print("Waiting for process to exit gracefully...")
process.join(timeout=30)
if process.exitcode is None:
print("Process didnt exit. Force killing...")
process.kill()
process.join()
if not process.is_alive(): if not process.is_alive():
print(f"Track process for {name} is not alive. Starting again...") print(f"Process for {name} is not alive. Starting again...")
camera_process['process_fps'].value = 0.0 camera_process['fps'].value = float(self.config[name]['fps'])
camera_process['detection_fps'].value = 0.0 camera_process['skipped_fps'].value = 0.0
camera_process['read_start'].value = 0.0 process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
process = mp.Process(target=track_camera, args=(name, self.config[name], GLOBAL_OBJECT_CONFIG, camera_process['frame_queue'], self.tflite_process.detect_lock, self.tflite_process.detect_ready, self.tflite_process.frame_ready, self.tracked_objects_queue,
camera_process['frame_shape'], self.tflite_process.detection_queue, self.tracked_objects_queue, camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps']))
camera_process['process_fps'], camera_process['detection_fps'],
camera_process['read_start'], camera_process['detection_frame']))
process.daemon = True process.daemon = True
camera_process['process'] = process camera_process['process'] = process
process.start() process.start()
print(f"Track process started for {name}: {process.pid}") print(f"Camera_process started for {name}: {process.pid}")
if not camera_process['capture_thread'].is_alive():
frame_shape = camera_process['frame_shape']
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
ffmpeg_process = start_or_restart_ffmpeg(camera_process['ffmpeg_cmd'], frame_size)
camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, camera_process['frame_queue'],
camera_process['take_frame'], camera_process['camera_fps'], camera_process['detection_frame'])
camera_capture.start()
camera_process['ffmpeg_process'] = ffmpeg_process
camera_process['capture_thread'] = camera_capture
def main(): def main():
# connect to mqtt and setup last will # connect to mqtt and setup last will
@@ -151,7 +115,14 @@ def main():
client.connect(MQTT_HOST, MQTT_PORT, 60) client.connect(MQTT_HOST, MQTT_PORT, 60)
client.loop_start() client.loop_start()
plasma_process = start_plasma_store() # start plasma store
plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma']
plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
time.sleep(1)
rc = plasma_process.poll()
if rc is not None:
raise RuntimeError("plasma_store exited unexpectedly with "
"code %d" % (rc,))
## ##
# Setup config defaults for cameras # Setup config defaults for cameras
@@ -162,7 +133,7 @@ def main():
} }
# Queue for cameras to push tracked objects to # Queue for cameras to push tracked objects to
tracked_objects_queue = mp.SimpleQueue() tracked_objects_queue = mp.Queue()
# Start the shared tflite process # Start the shared tflite process
tflite_process = EdgeTPUProcess() tflite_process = EdgeTPUProcess()
@@ -170,56 +141,15 @@ def main():
# start the camera processes # start the camera processes
camera_processes = {} camera_processes = {}
for name, config in CONFIG['cameras'].items(): for name, config in CONFIG['cameras'].items():
# Merge the ffmpeg config with the global config
ffmpeg = config.get('ffmpeg', {})
ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
ffmpeg_global_args = ffmpeg.get('global_args', FFMPEG_DEFAULT_CONFIG['global_args'])
ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', FFMPEG_DEFAULT_CONFIG['hwaccel_args'])
ffmpeg_input_args = ffmpeg.get('input_args', FFMPEG_DEFAULT_CONFIG['input_args'])
ffmpeg_output_args = ffmpeg.get('output_args', FFMPEG_DEFAULT_CONFIG['output_args'])
ffmpeg_cmd = (['ffmpeg'] +
ffmpeg_global_args +
ffmpeg_hwaccel_args +
ffmpeg_input_args +
['-i', ffmpeg_input] +
ffmpeg_output_args +
['pipe:'])
if 'width' in config and 'height' in config:
frame_shape = (config['height'], config['width'], 3)
else:
frame_shape = get_frame_shape(ffmpeg_input)
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
take_frame = config.get('take_frame', 1)
detection_frame = mp.Value('d', 0.0)
ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
frame_queue = mp.SimpleQueue()
camera_fps = EventsPerSecond()
camera_fps.start()
camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, detection_frame)
camera_capture.start()
camera_processes[name] = { camera_processes[name] = {
'camera_fps': camera_fps, 'fps': mp.Value('d', float(config['fps'])),
'take_frame': take_frame, 'skipped_fps': mp.Value('d', 0.0),
'process_fps': mp.Value('d', 0.0),
'detection_fps': mp.Value('d', 0.0), 'detection_fps': mp.Value('d', 0.0),
'detection_frame': detection_frame, 'last_frame': datetime.datetime.now().timestamp()
'read_start': mp.Value('d', 0.0),
'ffmpeg_process': ffmpeg_process,
'ffmpeg_cmd': ffmpeg_cmd,
'frame_queue': frame_queue,
'frame_shape': frame_shape,
'capture_thread': camera_capture
} }
camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
camera_process = mp.Process(target=track_camera, args=(name, config, GLOBAL_OBJECT_CONFIG, frame_queue, frame_shape, tflite_process.detect_lock, tflite_process.detect_ready, tflite_process.frame_ready, tracked_objects_queue,
tflite_process.detection_queue, tracked_objects_queue, camera_processes[name]['process_fps'], camera_processes[name]['fps'], camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps']))
camera_processes[name]['detection_fps'],
camera_processes[name]['read_start'], camera_processes[name]['detection_frame']))
camera_process.daemon = True camera_process.daemon = True
camera_processes[name]['process'] = camera_process camera_processes[name]['process'] = camera_process
@@ -230,7 +160,7 @@ def main():
object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue) object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue)
object_processor.start() object_processor.start()
camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, plasma_process) camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, object_processor)
camera_watchdog.start() camera_watchdog.start()
# create a flask app that encodes frames a mjpeg on demand # create a flask app that encodes frames a mjpeg on demand
@@ -243,23 +173,6 @@ def main():
# return a healh # return a healh
return "Frigate is running. Alive and healthy!" return "Frigate is running. Alive and healthy!"
@app.route('/debug/stack')
def processor_stack():
frame = sys._current_frames().get(object_processor.ident, None)
if frame:
return "<br>".join(traceback.format_stack(frame)), 200
else:
return "no frame found", 200
@app.route('/debug/print_stack')
def print_stack():
pid = int(request.args.get('pid', 0))
if pid == 0:
return "missing pid", 200
else:
os.kill(pid, signal.SIGUSR1)
return "check logs", 200
@app.route('/debug/stats') @app.route('/debug/stats')
def stats(): def stats():
stats = {} stats = {}
@@ -268,32 +181,17 @@ def main():
for name, camera_stats in camera_processes.items(): for name, camera_stats in camera_processes.items():
total_detection_fps += camera_stats['detection_fps'].value total_detection_fps += camera_stats['detection_fps'].value
capture_thread = camera_stats['capture_thread']
stats[name] = { stats[name] = {
'camera_fps': round(capture_thread.fps.eps(), 2), 'fps': camera_stats['fps'].value,
'process_fps': round(camera_stats['process_fps'].value, 2), 'skipped_fps': camera_stats['skipped_fps'].value,
'skipped_fps': round(capture_thread.skipped_fps.eps(), 2), 'detection_fps': camera_stats['detection_fps'].value
'detection_fps': round(camera_stats['detection_fps'].value, 2),
'read_start': camera_stats['read_start'].value,
'pid': camera_stats['process'].pid,
'ffmpeg_pid': camera_stats['ffmpeg_process'].pid,
'frame_info': {
'read': capture_thread.current_frame,
'detect': camera_stats['detection_frame'].value,
'process': object_processor.camera_data[name]['current_frame_time']
}
} }
stats['coral'] = { stats['coral'] = {
'fps': round(total_detection_fps, 2), 'fps': total_detection_fps,
'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2), 'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2)
'detection_start': tflite_process.detection_start.value,
'pid': tflite_process.detect_process.pid
} }
rc = camera_watchdog.plasma_process.poll()
stats['plasma_store_rc'] = rc
return jsonify(stats) return jsonify(stats)
@app.route('/<camera_name>/<label>/best.jpg') @app.route('/<camera_name>/<label>/best.jpg')
@@ -312,35 +210,28 @@ def main():
@app.route('/<camera_name>') @app.route('/<camera_name>')
def mjpeg_feed(camera_name): def mjpeg_feed(camera_name):
fps = int(request.args.get('fps', '3'))
height = int(request.args.get('h', '360'))
if camera_name in CONFIG['cameras']: if camera_name in CONFIG['cameras']:
# return a multipart response # return a multipart response
return Response(imagestream(camera_name, fps, height), return Response(imagestream(camera_name),
mimetype='multipart/x-mixed-replace; boundary=frame') mimetype='multipart/x-mixed-replace; boundary=frame')
else: else:
return "Camera named {} not found".format(camera_name), 404 return "Camera named {} not found".format(camera_name), 404
def imagestream(camera_name, fps, height): def imagestream(camera_name):
while True: while True:
# max out at specified FPS # max out at 1 FPS
time.sleep(1/fps) time.sleep(1)
frame = object_processor.get_current_frame(camera_name) frame = object_processor.get_current_frame(camera_name)
if frame is None: if frame is None:
frame = np.zeros((height,int(height*16/9),3), np.uint8) frame = np.zeros((720,1280,3), np.uint8)
width = int(height*frame.shape[1]/frame.shape[0])
frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_LINEAR)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
ret, jpg = cv2.imencode('.jpg', frame) ret, jpg = cv2.imencode('.jpg', frame)
yield (b'--frame\r\n' yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n') b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n')
app.run(host='0.0.0.0', port=WEB_PORT, debug=False) app.run(host='0.0.0.0', port=WEB_PORT, debug=False)
object_processor.join() camera_watchdog.join()
plasma_process.terminate() plasma_process.terminate()

View File

@@ -1,12 +1,11 @@
import os import os
import datetime import datetime
import hashlib
import multiprocessing as mp import multiprocessing as mp
import numpy as np import numpy as np
import pyarrow.plasma as plasma import SharedArray as sa
import tflite_runtime.interpreter as tflite import tflite_runtime.interpreter as tflite
from tflite_runtime.interpreter import load_delegate from tflite_runtime.interpreter import load_delegate
from frigate.util import EventsPerSecond, listen from frigate.util import EventsPerSecond
def load_labels(path, encoding='utf-8'): def load_labels(path, encoding='utf-8'):
"""Loads labels from file (with or without index numbers). """Loads labels from file (with or without index numbers).
@@ -61,82 +60,77 @@ class ObjectDetector():
return detections return detections
def run_detector(detection_queue, avg_speed, start):
print(f"Starting detection process: {os.getpid()}")
listen()
plasma_client = plasma.connect("/tmp/plasma")
object_detector = ObjectDetector()
while True:
object_id_str = detection_queue.get()
object_id_hash = hashlib.sha1(str.encode(object_id_str))
object_id = plasma.ObjectID(object_id_hash.digest())
object_id_out = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{object_id_str}")).digest())
input_frame = plasma_client.get(object_id, timeout_ms=0)
if input_frame is plasma.ObjectNotAvailable:
continue
# detect and put the output in the plasma store
start.value = datetime.datetime.now().timestamp()
plasma_client.put(object_detector.detect_raw(input_frame), object_id_out)
duration = datetime.datetime.now().timestamp()-start.value
start.value = 0.0
avg_speed.value = (avg_speed.value*9 + duration)/10
class EdgeTPUProcess(): class EdgeTPUProcess():
def __init__(self): def __init__(self):
self.detection_queue = mp.SimpleQueue() # TODO: see if we can use the plasma store with a queue and maintain the same speeds
self.avg_inference_speed = mp.Value('d', 0.01) try:
self.detection_start = mp.Value('d', 0.0) sa.delete("frame")
self.detect_process = None except:
self.start_or_restart() pass
try:
sa.delete("detections")
except:
pass
def start_or_restart(self): self.input_frame = sa.create("frame", shape=(1,300,300,3), dtype=np.uint8)
self.detection_start.value = 0.0 self.detections = sa.create("detections", shape=(20,6), dtype=np.float32)
if (not self.detect_process is None) and self.detect_process.is_alive():
self.detect_process.terminate() self.detect_lock = mp.Lock()
print("Waiting for detection process to exit gracefully...") self.detect_ready = mp.Event()
self.detect_process.join(timeout=30) self.frame_ready = mp.Event()
if self.detect_process.exitcode is None: self.avg_inference_speed = mp.Value('d', 0.01)
print("Detection process didnt exit. Force killing...")
self.detect_process.kill() def run_detector(detect_ready, frame_ready, avg_speed):
self.detect_process.join() print(f"Starting detection process: {os.getpid()}")
self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.avg_inference_speed, self.detection_start)) object_detector = ObjectDetector()
input_frame = sa.attach("frame")
detections = sa.attach("detections")
while True:
# wait until a frame is ready
frame_ready.wait()
start = datetime.datetime.now().timestamp()
# signal that the process is busy
frame_ready.clear()
detections[:] = object_detector.detect_raw(input_frame)
# signal that the process is ready to detect
detect_ready.set()
duration = datetime.datetime.now().timestamp()-start
avg_speed.value = (avg_speed.value*9 + duration)/10
self.detect_process = mp.Process(target=run_detector, args=(self.detect_ready, self.frame_ready, self.avg_inference_speed))
self.detect_process.daemon = True self.detect_process.daemon = True
self.detect_process.start() self.detect_process.start()
class RemoteObjectDetector(): class RemoteObjectDetector():
def __init__(self, name, labels, detection_queue): def __init__(self, labels, detect_lock, detect_ready, frame_ready):
self.labels = load_labels(labels) self.labels = load_labels(labels)
self.name = name
self.input_frame = sa.attach("frame")
self.detections = sa.attach("detections")
self.fps = EventsPerSecond() self.fps = EventsPerSecond()
self.plasma_client = plasma.connect("/tmp/plasma")
self.detection_queue = detection_queue self.detect_lock = detect_lock
self.detect_ready = detect_ready
self.frame_ready = frame_ready
def detect(self, tensor_input, threshold=.4): def detect(self, tensor_input, threshold=.4):
detections = [] detections = []
with self.detect_lock:
now = f"{self.name}-{str(datetime.datetime.now().timestamp())}" self.input_frame[:] = tensor_input
object_id_frame = plasma.ObjectID(hashlib.sha1(str.encode(now)).digest()) # unset detections and signal that a frame is ready
object_id_detections = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{now}")).digest()) self.detect_ready.clear()
self.plasma_client.put(tensor_input, object_id_frame) self.frame_ready.set()
self.detection_queue.put(now) # wait until the detection process is finished,
raw_detections = self.plasma_client.get(object_id_detections, timeout_ms=10000) self.detect_ready.wait()
for d in self.detections:
if raw_detections is plasma.ObjectNotAvailable: if d[1] < threshold:
self.plasma_client.delete([object_id_frame]) break
return detections detections.append((
self.labels[int(d[0])],
for d in raw_detections: float(d[1]),
if d[1] < threshold: (d[2], d[3], d[4], d[5])
break ))
detections.append((
self.labels[int(d[0])],
float(d[1]),
(d[2], d[3], d[4], d[5])
))
self.plasma_client.delete([object_id_frame, object_id_detections])
self.fps.update() self.fps.update()
return detections return detections

View File

@@ -1,7 +1,6 @@
import json import json
import hashlib import hashlib
import datetime import datetime
import time
import copy import copy
import cv2 import cv2
import threading import threading
@@ -9,8 +8,9 @@ import numpy as np
from collections import Counter, defaultdict from collections import Counter, defaultdict
import itertools import itertools
import pyarrow.plasma as plasma import pyarrow.plasma as plasma
import SharedArray as sa
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from frigate.util import draw_box_with_label, PlasmaManager from frigate.util import draw_box_with_label
from frigate.edgetpu import load_labels from frigate.edgetpu import load_labels
PATH_TO_LABELS = '/labelmap.txt' PATH_TO_LABELS = '/labelmap.txt'
@@ -29,15 +29,13 @@ class TrackedObjectProcessor(threading.Thread):
self.client = client self.client = client
self.topic_prefix = topic_prefix self.topic_prefix = topic_prefix
self.tracked_objects_queue = tracked_objects_queue self.tracked_objects_queue = tracked_objects_queue
self.plasma_client = plasma.connect("/tmp/plasma")
self.camera_data = defaultdict(lambda: { self.camera_data = defaultdict(lambda: {
'best_objects': {}, 'best_objects': {},
'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')), 'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')),
'tracked_objects': {}, 'tracked_objects': {},
'current_frame': np.zeros((720,1280,3), np.uint8), 'current_frame_time': datetime.datetime.now().timestamp()
'current_frame_time': 0.0,
'object_id': None
}) })
self.plasma_client = PlasmaManager()
def get_best(self, camera, label): def get_best(self, camera, label):
if label in self.camera_data[camera]['best_objects']: if label in self.camera_data[camera]['best_objects']:
@@ -47,6 +45,9 @@ class TrackedObjectProcessor(threading.Thread):
def get_current_frame(self, camera): def get_current_frame(self, camera):
return self.camera_data[camera]['current_frame'] return self.camera_data[camera]['current_frame']
def get_current_frame_time(self, camera):
return self.camera_data[camera]['current_frame_time']
def run(self): def run(self):
while True: while True:
@@ -56,43 +57,40 @@ class TrackedObjectProcessor(threading.Thread):
best_objects = self.camera_data[camera]['best_objects'] best_objects = self.camera_data[camera]['best_objects']
current_object_status = self.camera_data[camera]['object_status'] current_object_status = self.camera_data[camera]['object_status']
self.camera_data[camera]['tracked_objects'] = tracked_objects self.camera_data[camera]['tracked_objects'] = tracked_objects
self.camera_data[camera]['current_frame_time'] = frame_time
### ###
# Draw tracked objects on the frame # Draw tracked objects on the frame
### ###
current_frame = self.plasma_client.get(f"{camera}{frame_time}") object_id_hash = hashlib.sha1(str.encode(f"{camera}{frame_time}"))
object_id_bytes = object_id_hash.digest()
object_id = plasma.ObjectID(object_id_bytes)
current_frame = self.plasma_client.get(object_id)
if not current_frame is plasma.ObjectNotAvailable: # draw the bounding boxes on the frame
# draw the bounding boxes on the frame for obj in tracked_objects.values():
for obj in tracked_objects.values(): thickness = 2
thickness = 2 color = COLOR_MAP[obj['label']]
color = COLOR_MAP[obj['label']]
if obj['frame_time'] != frame_time:
thickness = 1
color = (255,0,0)
# draw the bounding boxes on the frame
box = obj['box']
draw_box_with_label(current_frame, box[0], box[1], box[2], box[3], obj['label'], f"{int(obj['score']*100)}% {int(obj['area'])}", thickness=thickness, color=color)
# draw the regions on the frame
region = obj['region']
cv2.rectangle(current_frame, (region[0], region[1]), (region[2], region[3]), (0,255,0), 1)
if config['snapshots']['show_timestamp']: if obj['frame_time'] != frame_time:
time_to_show = datetime.datetime.fromtimestamp(frame_time).strftime("%m/%d/%Y %H:%M:%S") thickness = 1
cv2.putText(current_frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2) color = (255,0,0)
### # draw the bounding boxes on the frame
# Set the current frame box = obj['box']
### draw_box_with_label(current_frame, box[0], box[1], box[2], box[3], obj['label'], f"{int(obj['score']*100)}% {int(obj['area'])}", thickness=thickness, color=color)
self.camera_data[camera]['current_frame'] = current_frame # draw the regions on the frame
region = obj['region']
cv2.rectangle(current_frame, (region[0], region[1]), (region[2], region[3]), (0,255,0), 1)
if config['snapshots']['show_timestamp']:
time_to_show = datetime.datetime.fromtimestamp(frame_time).strftime("%m/%d/%Y %H:%M:%S")
cv2.putText(current_frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
# delete the previous frame from the plasma store and update the object id ###
if not self.camera_data[camera]['object_id'] is None: # Set the current frame as ready
self.plasma_client.delete(self.camera_data[camera]['object_id']) ###
self.camera_data[camera]['object_id'] = f"{camera}{frame_time}" self.camera_data[camera]['current_frame'] = current_frame
self.camera_data[camera]['current_frame_time'] = frame_time
### ###
# Maintain the highest scoring recent object and frame for each label # Maintain the highest scoring recent object and frame for each label
@@ -106,10 +104,10 @@ class TrackedObjectProcessor(threading.Thread):
# if the object is a higher score than the current best score # if the object is a higher score than the current best score
# or the current object is more than 1 minute old, use the new object # or the current object is more than 1 minute old, use the new object
if obj['score'] > best_objects[obj['label']]['score'] or (now - best_objects[obj['label']]['frame_time']) > 60: if obj['score'] > best_objects[obj['label']]['score'] or (now - best_objects[obj['label']]['frame_time']) > 60:
obj['frame'] = np.copy(self.camera_data[camera]['current_frame']) obj['frame'] = np.copy(current_frame)
best_objects[obj['label']] = obj best_objects[obj['label']] = obj
else: else:
obj['frame'] = np.copy(self.camera_data[camera]['current_frame']) obj['frame'] = np.copy(current_frame)
best_objects[obj['label']] = obj best_objects[obj['label']] = obj
### ###
@@ -144,4 +142,4 @@ class TrackedObjectProcessor(threading.Thread):
ret, jpg = cv2.imencode('.jpg', best_frame) ret, jpg = cv2.imencode('.jpg', best_frame)
if ret: if ret:
jpg_bytes = jpg.tobytes() jpg_bytes = jpg.tobytes()
self.client.publish(f"{self.topic_prefix}/{camera}/{obj_name}/snapshot", jpg_bytes, retain=True) self.client.publish(f"{self.topic_prefix}/{camera}/{obj_name}/snapshot", jpg_bytes, retain=True)

View File

@@ -49,6 +49,14 @@ class ObjectTracker():
obj['history'] = [entry] obj['history'] = [entry]
def match_and_update(self, frame_time, new_objects): def match_and_update(self, frame_time, new_objects):
if len(new_objects) == 0:
for id in list(self.tracked_objects.keys()):
if self.disappeared[id] >= self.max_disappeared:
self.deregister(id)
else:
self.disappeared[id] += 1
return
# group by name # group by name
new_object_groups = defaultdict(lambda: []) new_object_groups = defaultdict(lambda: [])
for obj in new_objects: for obj in new_objects:
@@ -61,18 +69,6 @@ class ObjectTracker():
'frame_time': frame_time 'frame_time': frame_time
}) })
# update any tracked objects with labels that are not
# seen in the current objects and deregister if needed
for obj in list(self.tracked_objects.values()):
if not obj['label'] in new_object_groups:
if self.disappeared[obj['id']] >= self.max_disappeared:
self.deregister(obj['id'])
else:
self.disappeared[obj['id']] += 1
if len(new_objects) == 0:
return
# track objects for each label type # track objects for each label type
for label, group in new_object_groups.items(): for label, group in new_object_groups.items():
current_objects = [o for o in self.tracked_objects.values() if o['label'] == label] current_objects = [o for o in self.tracked_objects.values() if o['label'] == label]

View File

@@ -1,14 +1,9 @@
import datetime import datetime
import time
import signal
import traceback
import collections import collections
import numpy as np import numpy as np
import cv2 import cv2
import threading import threading
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import hashlib
import pyarrow.plasma as plasma
def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'): def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'):
if color is None: if color is None:
@@ -132,52 +127,3 @@ class EventsPerSecond:
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
seconds = min(now-self._start, last_n_seconds) seconds = min(now-self._start, last_n_seconds)
return len([t for t in self._timestamps if t > (now-last_n_seconds)]) / seconds return len([t for t in self._timestamps if t > (now-last_n_seconds)]) / seconds
def print_stack(sig, frame):
traceback.print_stack(frame)
def listen():
signal.signal(signal.SIGUSR1, print_stack)
class PlasmaManager:
def __init__(self):
self.connect()
def connect(self):
while True:
try:
self.plasma_client = plasma.connect("/tmp/plasma")
return
except:
print(f"TrackedObjectProcessor: unable to connect plasma client")
time.sleep(10)
def get(self, name, timeout_ms=0):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
return self.plasma_client.get(object_id, timeout_ms=timeout_ms)
except:
self.connect()
time.sleep(1)
def put(self, name, obj):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
self.plasma_client.put(obj, object_id)
return
except Exception as e:
print(f"Failed to put in plasma: {e}")
self.connect()
time.sleep(1)
def delete(self, name):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
self.plasma_client.delete([object_id])
return
except:
self.connect()
time.sleep(1)

View File

@@ -5,19 +5,22 @@ import cv2
import queue import queue
import threading import threading
import ctypes import ctypes
import pyarrow.plasma as plasma
import multiprocessing as mp import multiprocessing as mp
import subprocess as sp import subprocess as sp
import numpy as np import numpy as np
import hashlib
import pyarrow.plasma as plasma
import SharedArray as sa
import copy import copy
import itertools import itertools
import json import json
from collections import defaultdict from collections import defaultdict
from frigate.util import draw_box_with_label, area, calculate_region, clipped, intersection_over_union, intersection, EventsPerSecond, listen, PlasmaManager from frigate.util import draw_box_with_label, area, calculate_region, clipped, intersection_over_union, intersection, EventsPerSecond
from frigate.objects import ObjectTracker from frigate.objects import ObjectTracker
from frigate.edgetpu import RemoteObjectDetector from frigate.edgetpu import RemoteObjectDetector
from frigate.motion import MotionDetector from frigate.motion import MotionDetector
# TODO: add back opencv fallback
def get_frame_shape(source): def get_frame_shape(source):
ffprobe_cmd = " ".join([ ffprobe_cmd = " ".join([
'ffprobe', 'ffprobe',
@@ -96,82 +99,16 @@ def create_tensor_input(frame, region):
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
return np.expand_dims(cropped_frame, axis=0) return np.expand_dims(cropped_frame, axis=0)
def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None): def track_camera(name, config, ffmpeg_global_config, global_objects_config, detect_lock, detect_ready, frame_ready, detected_objects_queue, fps, skipped_fps, detection_fps):
if not ffmpeg_process is None:
print("Terminating the existing ffmpeg process...")
ffmpeg_process.terminate()
try:
print("Waiting for ffmpeg to exit gracefully...")
ffmpeg_process.communicate(timeout=30)
except sp.TimeoutExpired:
print("FFmpeg didnt exit. Force killing...")
ffmpeg_process.kill()
ffmpeg_process.communicate()
ffmpeg_process = None
print("Creating ffmpeg process...")
print(" ".join(ffmpeg_cmd))
process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, stdin = sp.DEVNULL, bufsize=frame_size*10, start_new_session=True)
return process
class CameraCapture(threading.Thread):
def __init__(self, name, ffmpeg_process, frame_shape, frame_queue, take_frame, fps, detection_frame):
threading.Thread.__init__(self)
self.name = name
self.frame_shape = frame_shape
self.frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
self.frame_queue = frame_queue
self.take_frame = take_frame
self.fps = fps
self.skipped_fps = EventsPerSecond()
self.plasma_client = PlasmaManager()
self.ffmpeg_process = ffmpeg_process
self.current_frame = 0
self.last_frame = 0
self.detection_frame = detection_frame
def run(self):
frame_num = 0
self.skipped_fps.start()
while True:
if self.ffmpeg_process.poll() != None:
print(f"{self.name}: ffmpeg process is not running. exiting capture thread...")
break
frame_bytes = self.ffmpeg_process.stdout.read(self.frame_size)
self.current_frame = datetime.datetime.now().timestamp()
if len(frame_bytes) == 0:
print(f"{self.name}: ffmpeg didnt return a frame. something is wrong.")
continue
self.fps.update()
frame_num += 1
if (frame_num % self.take_frame) != 0:
self.skipped_fps.update()
continue
# if the detection process is more than 1 second behind, skip this frame
if self.detection_frame.value > 0.0 and (self.last_frame - self.detection_frame.value) > 1:
self.skipped_fps.update()
continue
# put the frame in the plasma store
self.plasma_client.put(f"{self.name}{self.current_frame}",
np
.frombuffer(frame_bytes, np.uint8)
.reshape(self.frame_shape)
)
# add to the queue
self.frame_queue.put(self.current_frame)
self.last_frame = self.current_frame
def track_camera(name, config, global_objects_config, frame_queue, frame_shape, detection_queue, detected_objects_queue, fps, detection_fps, read_start, detection_frame):
print(f"Starting process for {name}: {os.getpid()}") print(f"Starting process for {name}: {os.getpid()}")
listen()
detection_frame.value = 0.0 # Merge the ffmpeg config with the global config
ffmpeg = config.get('ffmpeg', {})
ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
ffmpeg_global_args = ffmpeg.get('global_args', ffmpeg_global_config['global_args'])
ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', ffmpeg_global_config['hwaccel_args'])
ffmpeg_input_args = ffmpeg.get('input_args', ffmpeg_global_config['input_args'])
ffmpeg_output_args = ffmpeg.get('output_args', ffmpeg_global_config['output_args'])
# Merge the tracked object config with the global config # Merge the tracked object config with the global config
camera_objects_config = config.get('objects', {}) camera_objects_config = config.get('objects', {})
@@ -185,7 +122,18 @@ def track_camera(name, config, global_objects_config, frame_queue, frame_shape,
for obj in objects_with_config: for obj in objects_with_config:
object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})} object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})}
frame = np.zeros(frame_shape, np.uint8) expected_fps = config['fps']
take_frame = config.get('take_frame', 1)
frame_shape = get_frame_shape(ffmpeg_input)
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
try:
sa.delete(name)
except:
pass
frame = sa.create(name, shape=frame_shape, dtype=np.uint8)
# load in the mask for object detection # load in the mask for object detection
if 'mask' in config: if 'mask' in config:
@@ -198,36 +146,66 @@ def track_camera(name, config, global_objects_config, frame_queue, frame_shape,
mask[:] = 255 mask[:] = 255
motion_detector = MotionDetector(frame_shape, mask, resize_factor=6) motion_detector = MotionDetector(frame_shape, mask, resize_factor=6)
object_detector = RemoteObjectDetector(name, '/labelmap.txt', detection_queue) object_detector = RemoteObjectDetector('/labelmap.txt', detect_lock, detect_ready, frame_ready)
object_tracker = ObjectTracker(10) object_tracker = ObjectTracker(10)
plasma_client = PlasmaManager() ffmpeg_cmd = (['ffmpeg'] +
ffmpeg_global_args +
ffmpeg_hwaccel_args +
ffmpeg_input_args +
['-i', ffmpeg_input] +
ffmpeg_output_args +
['pipe:'])
print(" ".join(ffmpeg_cmd))
ffmpeg_process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=frame_size)
plasma_client = plasma.connect("/tmp/plasma")
frame_num = 0
avg_wait = 0.0 avg_wait = 0.0
fps_tracker = EventsPerSecond() fps_tracker = EventsPerSecond()
skipped_fps_tracker = EventsPerSecond()
fps_tracker.start() fps_tracker.start()
skipped_fps_tracker.start()
object_detector.fps.start() object_detector.fps.start()
while True: while True:
read_start.value = datetime.datetime.now().timestamp() start = datetime.datetime.now().timestamp()
frame_time = frame_queue.get() frame_bytes = ffmpeg_process.stdout.read(frame_size)
duration = datetime.datetime.now().timestamp()-read_start.value duration = datetime.datetime.now().timestamp()-start
read_start.value = 0.0
avg_wait = (avg_wait*99+duration)/100 avg_wait = (avg_wait*99+duration)/100
detection_frame.value = frame_time
# Get frame from plasma store
frame = plasma_client.get(f"{name}{frame_time}")
if frame is plasma.ObjectNotAvailable: if not frame_bytes:
break
# limit frame rate
frame_num += 1
if (frame_num % take_frame) != 0:
continue continue
fps_tracker.update() fps_tracker.update()
fps.value = fps_tracker.eps() fps.value = fps_tracker.eps()
detection_fps.value = object_detector.fps.eps() detection_fps.value = object_detector.fps.eps()
frame_time = datetime.datetime.now().timestamp()
# Store frame in numpy array
frame[:] = (np
.frombuffer(frame_bytes, np.uint8)
.reshape(frame_shape))
# look for motion # look for motion
motion_boxes = motion_detector.detect(frame) motion_boxes = motion_detector.detect(frame)
# skip object detection if we are below the min_fps and wait time is less than half the average
if frame_num > 100 and fps.value < expected_fps-1 and duration < 0.5*avg_wait:
skipped_fps_tracker.update()
skipped_fps.value = skipped_fps_tracker.eps()
continue
skipped_fps.value = skipped_fps_tracker.eps()
tracked_objects = object_tracker.tracked_objects.values() tracked_objects = object_tracker.tracked_objects.values()
# merge areas of motion that intersect with a known tracked object into a single area to look at # merge areas of motion that intersect with a known tracked object into a single area to look at
@@ -327,7 +305,7 @@ def track_camera(name, config, global_objects_config, frame_queue, frame_shape,
for index in idxs: for index in idxs:
obj = group[index[0]] obj = group[index[0]]
if clipped(obj, frame_shape): if clipped(obj, frame_shape): #obj['clipped']:
box = obj[2] box = obj[2]
# calculate a new region that will hopefully get the entire object # calculate a new region that will hopefully get the entire object
region = calculate_region(frame_shape, region = calculate_region(frame_shape,
@@ -367,7 +345,8 @@ def track_camera(name, config, global_objects_config, frame_queue, frame_shape,
# now that we have refined our detections, we need to track objects # now that we have refined our detections, we need to track objects
object_tracker.match_and_update(frame_time, detections) object_tracker.match_and_update(frame_time, detections)
# put the frame in the plasma store
object_id = hashlib.sha1(str.encode(f"{name}{frame_time}")).digest()
plasma_client.put(frame, plasma.ObjectID(object_id))
# add to the queue # add to the queue
detected_objects_queue.put((name, frame_time, object_tracker.tracked_objects)) detected_objects_queue.put((name, frame_time, object_tracker.tracked_objects))
print(f"{name}: exiting subprocess")