Compare commits

..

3 Commits

Author SHA1 Message Date
Blake Blackshear
8507bbbb31 make object processor resilient to plasma failures 2020-03-13 16:35:58 -05:00
Blake Blackshear
b6fcb88e5c remove sharedarray references 2020-03-13 15:50:27 -05:00
Blake Blackshear
d3cd4afa65 handle various scenarios with external process failures 2020-03-09 21:12:19 -05:00
6 changed files with 166 additions and 222 deletions

View File

@@ -7,7 +7,7 @@ RUN apt -qq update && apt -qq install --no-install-recommends -y \
software-properties-common \ software-properties-common \
# apt-transport-https ca-certificates \ # apt-transport-https ca-certificates \
build-essential \ build-essential \
gnupg wget unzip tzdata \ gnupg wget unzip \
# libcap-dev \ # libcap-dev \
&& add-apt-repository ppa:deadsnakes/ppa -y \ && add-apt-repository ppa:deadsnakes/ppa -y \
&& apt -qq install --no-install-recommends -y \ && apt -qq install --no-install-recommends -y \

View File

@@ -110,6 +110,13 @@ cameras:
################ ################
take_frame: 1 take_frame: 1
################
# The expected framerate for the camera. Frigate will try and ensure it maintains this framerate
# by dropping frames as necessary. Setting this lower than the actual framerate will allow frigate
# to process every frame at the expense of realtime processing.
################
fps: 5
################ ################
# Configuration for the snapshots in the debug view and mqtt # Configuration for the snapshots in the debug view and mqtt
################ ################

View File

@@ -15,7 +15,7 @@ import logging
from flask import Flask, Response, make_response, jsonify, request from flask import Flask, Response, make_response, jsonify, request
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from frigate.video import track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg from frigate.video import track_camera
from frigate.object_processing import TrackedObjectProcessor from frigate.object_processing import TrackedObjectProcessor
from frigate.util import EventsPerSecond from frigate.util import EventsPerSecond
from frigate.edgetpu import EdgeTPUProcess from frigate.edgetpu import EdgeTPUProcess
@@ -63,7 +63,7 @@ DEBUG = (CONFIG.get('debug', '0') == '1')
def start_plasma_store(): def start_plasma_store():
plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma'] plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma']
plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL) plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL)
time.sleep(1) time.sleep(1)
rc = plasma_process.poll() rc = plasma_process.poll()
if rc is not None: if rc is not None:
@@ -83,63 +83,60 @@ class CameraWatchdog(threading.Thread):
time.sleep(10) time.sleep(10)
while True: while True:
# wait a bit before checking # wait a bit before checking
time.sleep(10) time.sleep(30)
now = datetime.datetime.now().timestamp()
# check the plasma process # check the plasma process
rc = self.plasma_process.poll() rc = self.plasma_process.poll()
if rc != None: if rc != None:
print(f"plasma_process exited unexpectedly with {rc}") print(f"plasma_process exited unexpectedly with {rc}")
self.plasma_process = start_plasma_store() self.plasma_process = start_plasma_store()
time.sleep(10)
# check the detection process # check the detection process
detection_start = self.tflite_process.detection_start.value if (self.tflite_process.detection_start.value > 0.0 and
if (detection_start > 0.0 and datetime.datetime.now().timestamp() - self.tflite_process.detection_start.value > 10):
now - detection_start > 10):
print("Detection appears to be stuck. Restarting detection process") print("Detection appears to be stuck. Restarting detection process")
self.tflite_process.start_or_restart() self.tflite_process.start_or_restart()
time.sleep(30)
elif not self.tflite_process.detect_process.is_alive(): elif not self.tflite_process.detect_process.is_alive():
print("Detection appears to have stopped. Restarting detection process") print("Detection appears to have stopped. Restarting detection process")
self.tflite_process.start_or_restart() self.tflite_process.start_or_restart()
time.sleep(30)
# check the camera processes # check the camera processes
for name, camera_process in self.camera_processes.items(): for name, camera_process in self.camera_processes.items():
process = camera_process['process'] process = camera_process['process']
if not process.is_alive(): if not process.is_alive():
print(f"Track process for {name} is not alive. Starting again...") print(f"Process for {name} is not alive. Starting again...")
camera_process['process_fps'].value = 0.0 camera_process['fps'].value = float(self.config[name]['fps'])
camera_process['skipped_fps'].value = 0.0
camera_process['detection_fps'].value = 0.0 camera_process['detection_fps'].value = 0.0
camera_process['read_start'].value = 0.0 camera_process['read_start'].value = 0.0
process = mp.Process(target=track_camera, args=(name, self.config[name], GLOBAL_OBJECT_CONFIG, camera_process['frame_queue'], camera_process['ffmpeg_pid'].value = 0
camera_process['frame_shape'], self.tflite_process.detection_queue, self.tracked_objects_queue, process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
camera_process['process_fps'], camera_process['detection_fps'], self.tflite_process.detection_queue, self.tracked_objects_queue,
camera_process['read_start'], camera_process['detection_frame'])) camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps'],
camera_process['read_start'], camera_process['ffmpeg_pid']))
process.daemon = True process.daemon = True
camera_process['process'] = process camera_process['process'] = process
process.start() process.start()
print(f"Track process started for {name}: {process.pid}") print(f"Camera_process started for {name}: {process.pid}")
if not camera_process['capture_thread'].is_alive(): if (camera_process['read_start'].value > 0.0 and
frame_shape = camera_process['frame_shape'] datetime.datetime.now().timestamp() - camera_process['read_start'].value > 10):
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2] print(f"Process for {name} has been reading from ffmpeg for over 10 seconds long. Killing ffmpeg...")
ffmpeg_process = start_or_restart_ffmpeg(camera_process['ffmpeg_cmd'], frame_size) ffmpeg_pid = camera_process['ffmpeg_pid'].value
camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, camera_process['frame_queue'], if ffmpeg_pid != 0:
camera_process['take_frame'], camera_process['camera_fps'], camera_process['detection_frame']) try:
camera_capture.start() os.kill(ffmpeg_pid, signal.SIGTERM)
camera_process['ffmpeg_process'] = ffmpeg_process except OSError:
camera_process['capture_thread'] = camera_capture print(f"Unable to terminate ffmpeg with pid {ffmpeg_pid}")
elif now - camera_process['capture_thread'].current_frame > 5: time.sleep(10)
print(f"No frames received from {name} in 5 seconds. Exiting ffmpeg...") try:
ffmpeg_process = camera_process['ffmpeg_process'] os.kill(ffmpeg_pid, signal.SIGKILL)
ffmpeg_process.terminate() print(f"Unable to kill ffmpeg with pid {ffmpeg_pid}")
try: except OSError:
print("Waiting for ffmpeg to exit gracefully...") pass
ffmpeg_process.communicate(timeout=30)
except sp.TimeoutExpired:
print("FFmpeg didnt exit. Force killing...")
ffmpeg_process.kill()
ffmpeg_process.communicate()
def main(): def main():
# connect to mqtt and setup last will # connect to mqtt and setup last will
@@ -183,56 +180,17 @@ def main():
# start the camera processes # start the camera processes
camera_processes = {} camera_processes = {}
for name, config in CONFIG['cameras'].items(): for name, config in CONFIG['cameras'].items():
# Merge the ffmpeg config with the global config
ffmpeg = config.get('ffmpeg', {})
ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
ffmpeg_global_args = ffmpeg.get('global_args', FFMPEG_DEFAULT_CONFIG['global_args'])
ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', FFMPEG_DEFAULT_CONFIG['hwaccel_args'])
ffmpeg_input_args = ffmpeg.get('input_args', FFMPEG_DEFAULT_CONFIG['input_args'])
ffmpeg_output_args = ffmpeg.get('output_args', FFMPEG_DEFAULT_CONFIG['output_args'])
ffmpeg_cmd = (['ffmpeg'] +
ffmpeg_global_args +
ffmpeg_hwaccel_args +
ffmpeg_input_args +
['-i', ffmpeg_input] +
ffmpeg_output_args +
['pipe:'])
if 'width' in config and 'height' in config:
frame_shape = (config['height'], config['width'], 3)
else:
frame_shape = get_frame_shape(ffmpeg_input)
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
take_frame = config.get('take_frame', 1)
detection_frame = mp.Value('d', 0.0)
ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
frame_queue = mp.SimpleQueue()
camera_fps = EventsPerSecond()
camera_fps.start()
camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, detection_frame)
camera_capture.start()
camera_processes[name] = { camera_processes[name] = {
'camera_fps': camera_fps, 'fps': mp.Value('d', float(config['fps'])),
'take_frame': take_frame, 'skipped_fps': mp.Value('d', 0.0),
'process_fps': mp.Value('d', 0.0),
'detection_fps': mp.Value('d', 0.0), 'detection_fps': mp.Value('d', 0.0),
'detection_frame': detection_frame,
'read_start': mp.Value('d', 0.0), 'read_start': mp.Value('d', 0.0),
'ffmpeg_process': ffmpeg_process, 'ffmpeg_pid': mp.Value('i', 0)
'ffmpeg_cmd': ffmpeg_cmd,
'frame_queue': frame_queue,
'frame_shape': frame_shape,
'capture_thread': camera_capture
} }
camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
camera_process = mp.Process(target=track_camera, args=(name, config, GLOBAL_OBJECT_CONFIG, frame_queue, frame_shape, tflite_process.detection_queue, tracked_objects_queue, camera_processes[name]['fps'],
tflite_process.detection_queue, tracked_objects_queue, camera_processes[name]['process_fps'], camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps'],
camera_processes[name]['detection_fps'], camera_processes[name]['read_start'], camera_processes[name]['ffmpeg_pid']))
camera_processes[name]['read_start'], camera_processes[name]['detection_frame']))
camera_process.daemon = True camera_process.daemon = True
camera_processes[name]['process'] = camera_process camera_processes[name]['process'] = camera_process
@@ -281,20 +239,13 @@ def main():
for name, camera_stats in camera_processes.items(): for name, camera_stats in camera_processes.items():
total_detection_fps += camera_stats['detection_fps'].value total_detection_fps += camera_stats['detection_fps'].value
capture_thread = camera_stats['capture_thread']
stats[name] = { stats[name] = {
'camera_fps': round(capture_thread.fps.eps(), 2), 'fps': round(camera_stats['fps'].value, 2),
'process_fps': round(camera_stats['process_fps'].value, 2), 'skipped_fps': round(camera_stats['skipped_fps'].value, 2),
'skipped_fps': round(capture_thread.skipped_fps.eps(), 2),
'detection_fps': round(camera_stats['detection_fps'].value, 2), 'detection_fps': round(camera_stats['detection_fps'].value, 2),
'read_start': camera_stats['read_start'].value, 'read_start': camera_stats['read_start'].value,
'pid': camera_stats['process'].pid, 'pid': camera_stats['process'].pid,
'ffmpeg_pid': camera_stats['ffmpeg_process'].pid, 'ffmpeg_pid': camera_stats['ffmpeg_pid'].value
'frame_info': {
'read': capture_thread.current_frame,
'detect': camera_stats['detection_frame'].value,
'process': object_processor.camera_data[name]['current_frame_time']
}
} }
stats['coral'] = { stats['coral'] = {
@@ -342,9 +293,7 @@ def main():
if frame is None: if frame is None:
frame = np.zeros((height,int(height*16/9),3), np.uint8) frame = np.zeros((height,int(height*16/9),3), np.uint8)
width = int(height*frame.shape[1]/frame.shape[0]) frame = cv2.resize(frame, dsize=(int(height*16/9), height), interpolation=cv2.INTER_LINEAR)
frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_LINEAR)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
ret, jpg = cv2.imencode('.jpg', frame) ret, jpg = cv2.imencode('.jpg', frame)
@@ -353,7 +302,7 @@ def main():
app.run(host='0.0.0.0', port=WEB_PORT, debug=False) app.run(host='0.0.0.0', port=WEB_PORT, debug=False)
object_processor.join() camera_watchdog.join()
plasma_process.terminate() plasma_process.terminate()

View File

@@ -10,7 +10,7 @@ from collections import Counter, defaultdict
import itertools import itertools
import pyarrow.plasma as plasma import pyarrow.plasma as plasma
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from frigate.util import draw_box_with_label, PlasmaManager from frigate.util import draw_box_with_label
from frigate.edgetpu import load_labels from frigate.edgetpu import load_labels
PATH_TO_LABELS = '/labelmap.txt' PATH_TO_LABELS = '/labelmap.txt'
@@ -34,10 +34,8 @@ class TrackedObjectProcessor(threading.Thread):
'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')), 'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')),
'tracked_objects': {}, 'tracked_objects': {},
'current_frame': np.zeros((720,1280,3), np.uint8), 'current_frame': np.zeros((720,1280,3), np.uint8),
'current_frame_time': 0.0,
'object_id': None 'object_id': None
}) })
self.plasma_client = PlasmaManager()
def get_best(self, camera, label): def get_best(self, camera, label):
if label in self.camera_data[camera]['best_objects']: if label in self.camera_data[camera]['best_objects']:
@@ -47,8 +45,35 @@ class TrackedObjectProcessor(threading.Thread):
def get_current_frame(self, camera): def get_current_frame(self, camera):
return self.camera_data[camera]['current_frame'] return self.camera_data[camera]['current_frame']
def connect_plasma_client(self):
while True:
try:
self.plasma_client = plasma.connect("/tmp/plasma")
return
except:
print(f"TrackedObjectProcessor: unable to connect plasma client")
time.sleep(10)
def get_from_plasma(self, object_id):
while True:
try:
return self.plasma_client.get(object_id, timeout_ms=0)
except:
self.connect_plasma_client()
time.sleep(1)
def delete_from_plasma(self, object_ids):
while True:
try:
self.plasma_client.delete(object_ids)
return
except:
self.connect_plasma_client()
time.sleep(1)
def run(self): def run(self):
self.connect_plasma_client()
while True: while True:
camera, frame_time, tracked_objects = self.tracked_objects_queue.get() camera, frame_time, tracked_objects = self.tracked_objects_queue.get()
@@ -56,12 +81,14 @@ class TrackedObjectProcessor(threading.Thread):
best_objects = self.camera_data[camera]['best_objects'] best_objects = self.camera_data[camera]['best_objects']
current_object_status = self.camera_data[camera]['object_status'] current_object_status = self.camera_data[camera]['object_status']
self.camera_data[camera]['tracked_objects'] = tracked_objects self.camera_data[camera]['tracked_objects'] = tracked_objects
self.camera_data[camera]['current_frame_time'] = frame_time
### ###
# Draw tracked objects on the frame # Draw tracked objects on the frame
### ###
current_frame = self.plasma_client.get(f"{camera}{frame_time}") object_id_hash = hashlib.sha1(str.encode(f"{camera}{frame_time}"))
object_id_bytes = object_id_hash.digest()
object_id = plasma.ObjectID(object_id_bytes)
current_frame = self.get_from_plasma(object_id)
if not current_frame is plasma.ObjectNotAvailable: if not current_frame is plasma.ObjectNotAvailable:
# draw the bounding boxes on the frame # draw the bounding boxes on the frame
@@ -85,14 +112,15 @@ class TrackedObjectProcessor(threading.Thread):
cv2.putText(current_frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2) cv2.putText(current_frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
### ###
# Set the current frame # Set the current frame as ready
### ###
self.camera_data[camera]['current_frame'] = current_frame self.camera_data[camera]['current_frame'] = current_frame
# delete the previous frame from the plasma store and update the object id # store the object id, so you can delete it at the next loop
if not self.camera_data[camera]['object_id'] is None: previous_object_id = self.camera_data[camera]['object_id']
self.plasma_client.delete(self.camera_data[camera]['object_id']) if not previous_object_id is None:
self.camera_data[camera]['object_id'] = f"{camera}{frame_time}" self.delete_from_plasma([previous_object_id])
self.camera_data[camera]['object_id'] = object_id
### ###
# Maintain the highest scoring recent object and frame for each label # Maintain the highest scoring recent object and frame for each label

View File

@@ -1,5 +1,4 @@
import datetime import datetime
import time
import signal import signal
import traceback import traceback
import collections import collections
@@ -7,8 +6,6 @@ import numpy as np
import cv2 import cv2
import threading import threading
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import hashlib
import pyarrow.plasma as plasma
def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'): def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'):
if color is None: if color is None:
@@ -137,47 +134,4 @@ def print_stack(sig, frame):
traceback.print_stack(frame) traceback.print_stack(frame)
def listen(): def listen():
signal.signal(signal.SIGUSR1, print_stack) signal.signal(signal.SIGUSR1, print_stack)
class PlasmaManager:
def __init__(self):
self.connect()
def connect(self):
while True:
try:
self.plasma_client = plasma.connect("/tmp/plasma")
return
except:
print(f"TrackedObjectProcessor: unable to connect plasma client")
time.sleep(10)
def get(self, name, timeout_ms=0):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
return self.plasma_client.get(object_id, timeout_ms=timeout_ms)
except:
self.connect()
time.sleep(1)
def put(self, name, obj):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
self.plasma_client.put(obj, object_id)
return
except Exception as e:
print(f"Failed to put in plasma: {e}")
self.connect()
time.sleep(1)
def delete(self, name):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
self.plasma_client.delete([object_id])
return
except:
self.connect()
time.sleep(1)

View File

@@ -5,15 +5,16 @@ import cv2
import queue import queue
import threading import threading
import ctypes import ctypes
import pyarrow.plasma as plasma
import multiprocessing as mp import multiprocessing as mp
import subprocess as sp import subprocess as sp
import numpy as np import numpy as np
import hashlib
import pyarrow.plasma as plasma
import copy import copy
import itertools import itertools
import json import json
from collections import defaultdict from collections import defaultdict
from frigate.util import draw_box_with_label, area, calculate_region, clipped, intersection_over_union, intersection, EventsPerSecond, listen, PlasmaManager from frigate.util import draw_box_with_label, area, calculate_region, clipped, intersection_over_union, intersection, EventsPerSecond, listen
from frigate.objects import ObjectTracker from frigate.objects import ObjectTracker
from frigate.edgetpu import RemoteObjectDetector from frigate.edgetpu import RemoteObjectDetector
from frigate.motion import MotionDetector from frigate.motion import MotionDetector
@@ -96,7 +97,7 @@ def create_tensor_input(frame, region):
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
return np.expand_dims(cropped_frame, axis=0) return np.expand_dims(cropped_frame, axis=0)
def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None): def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, pid, ffmpeg_process=None):
if not ffmpeg_process is None: if not ffmpeg_process is None:
print("Terminating the existing ffmpeg process...") print("Terminating the existing ffmpeg process...")
ffmpeg_process.terminate() ffmpeg_process.terminate()
@@ -111,67 +112,29 @@ def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None):
print("Creating ffmpeg process...") print("Creating ffmpeg process...")
print(" ".join(ffmpeg_cmd)) print(" ".join(ffmpeg_cmd))
process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, stdin = sp.DEVNULL, bufsize=frame_size*10, start_new_session=True) process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=frame_size*10)
pid.value = process.pid
return process return process
class CameraCapture(threading.Thread): def track_camera(name, config, ffmpeg_global_config, global_objects_config, detection_queue, detected_objects_queue, fps, skipped_fps, detection_fps, read_start, ffmpeg_pid):
def __init__(self, name, ffmpeg_process, frame_shape, frame_queue, take_frame, fps, detection_frame):
threading.Thread.__init__(self)
self.name = name
self.frame_shape = frame_shape
self.frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
self.frame_queue = frame_queue
self.take_frame = take_frame
self.fps = fps
self.skipped_fps = EventsPerSecond()
self.plasma_client = PlasmaManager()
self.ffmpeg_process = ffmpeg_process
self.current_frame = 0
self.last_frame = 0
self.detection_frame = detection_frame
def run(self):
frame_num = 0
self.skipped_fps.start()
while True:
if self.ffmpeg_process.poll() != None:
print(f"{self.name}: ffmpeg process is not running. exiting capture thread...")
break
frame_bytes = self.ffmpeg_process.stdout.read(self.frame_size)
self.current_frame = datetime.datetime.now().timestamp()
if len(frame_bytes) == 0:
print(f"{self.name}: ffmpeg didnt return a frame. something is wrong.")
continue
self.fps.update()
frame_num += 1
if (frame_num % self.take_frame) != 0:
self.skipped_fps.update()
continue
# if the detection process is more than 1 second behind, skip this frame
if self.detection_frame.value > 0.0 and (self.last_frame - self.detection_frame.value) > 1:
self.skipped_fps.update()
continue
# put the frame in the plasma store
self.plasma_client.put(f"{self.name}{self.current_frame}",
np
.frombuffer(frame_bytes, np.uint8)
.reshape(self.frame_shape)
)
# add to the queue
self.frame_queue.put(self.current_frame)
self.last_frame = self.current_frame
def track_camera(name, config, global_objects_config, frame_queue, frame_shape, detection_queue, detected_objects_queue, fps, detection_fps, read_start, detection_frame):
print(f"Starting process for {name}: {os.getpid()}") print(f"Starting process for {name}: {os.getpid()}")
listen() listen()
detection_frame.value = 0.0 # Merge the ffmpeg config with the global config
ffmpeg = config.get('ffmpeg', {})
ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
ffmpeg_restart_delay = ffmpeg.get('restart_delay', 0)
ffmpeg_global_args = ffmpeg.get('global_args', ffmpeg_global_config['global_args'])
ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', ffmpeg_global_config['hwaccel_args'])
ffmpeg_input_args = ffmpeg.get('input_args', ffmpeg_global_config['input_args'])
ffmpeg_output_args = ffmpeg.get('output_args', ffmpeg_global_config['output_args'])
ffmpeg_cmd = (['ffmpeg'] +
ffmpeg_global_args +
ffmpeg_hwaccel_args +
ffmpeg_input_args +
['-i', ffmpeg_input] +
ffmpeg_output_args +
['pipe:'])
# Merge the tracked object config with the global config # Merge the tracked object config with the global config
camera_objects_config = config.get('objects', {}) camera_objects_config = config.get('objects', {})
@@ -185,6 +148,16 @@ def track_camera(name, config, global_objects_config, frame_queue, frame_shape,
for obj in objects_with_config: for obj in objects_with_config:
object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})} object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})}
expected_fps = config['fps']
take_frame = config.get('take_frame', 1)
if 'width' in config and 'height' in config:
frame_shape = (config['height'], config['width'], 3)
else:
frame_shape = get_frame_shape(ffmpeg_input)
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
frame = np.zeros(frame_shape, np.uint8) frame = np.zeros(frame_shape, np.uint8)
# load in the mask for object detection # load in the mask for object detection
@@ -201,33 +174,63 @@ def track_camera(name, config, global_objects_config, frame_queue, frame_shape,
object_detector = RemoteObjectDetector(name, '/labelmap.txt', detection_queue) object_detector = RemoteObjectDetector(name, '/labelmap.txt', detection_queue)
object_tracker = ObjectTracker(10) object_tracker = ObjectTracker(10)
plasma_client = PlasmaManager() ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_pid)
plasma_client = plasma.connect("/tmp/plasma")
frame_num = 0
avg_wait = 0.0 avg_wait = 0.0
fps_tracker = EventsPerSecond() fps_tracker = EventsPerSecond()
skipped_fps_tracker = EventsPerSecond()
fps_tracker.start() fps_tracker.start()
skipped_fps_tracker.start()
object_detector.fps.start() object_detector.fps.start()
while True: while True:
rc = ffmpeg_process.poll()
if rc != None:
print(f"{name}: ffmpeg_process exited unexpectedly with {rc}")
print(f"Letting {name} rest for {ffmpeg_restart_delay} seconds before restarting...")
time.sleep(ffmpeg_restart_delay)
ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_pid, ffmpeg_process)
time.sleep(10)
read_start.value = datetime.datetime.now().timestamp() read_start.value = datetime.datetime.now().timestamp()
frame_time = frame_queue.get() frame_bytes = ffmpeg_process.stdout.read(frame_size)
duration = datetime.datetime.now().timestamp()-read_start.value duration = datetime.datetime.now().timestamp()-read_start.value
read_start.value = 0.0 read_start.value = 0.0
avg_wait = (avg_wait*99+duration)/100 avg_wait = (avg_wait*99+duration)/100
detection_frame.value = frame_time
# Get frame from plasma store
frame = plasma_client.get(f"{name}{frame_time}")
if frame is plasma.ObjectNotAvailable: if len(frame_bytes) == 0:
print(f"{name}: ffmpeg_process didnt return any bytes")
continue
# limit frame rate
frame_num += 1
if (frame_num % take_frame) != 0:
continue continue
fps_tracker.update() fps_tracker.update()
fps.value = fps_tracker.eps() fps.value = fps_tracker.eps()
detection_fps.value = object_detector.fps.eps() detection_fps.value = object_detector.fps.eps()
frame_time = datetime.datetime.now().timestamp()
# Store frame in numpy array
frame[:] = (np
.frombuffer(frame_bytes, np.uint8)
.reshape(frame_shape))
# look for motion # look for motion
motion_boxes = motion_detector.detect(frame) motion_boxes = motion_detector.detect(frame)
# skip object detection if we are below the min_fps and wait time is less than half the average
if frame_num > 100 and fps.value < expected_fps-1 and duration < 0.5*avg_wait:
skipped_fps_tracker.update()
skipped_fps.value = skipped_fps_tracker.eps()
continue
skipped_fps.value = skipped_fps_tracker.eps()
tracked_objects = object_tracker.tracked_objects.values() tracked_objects = object_tracker.tracked_objects.values()
# merge areas of motion that intersect with a known tracked object into a single area to look at # merge areas of motion that intersect with a known tracked object into a single area to look at
@@ -327,7 +330,7 @@ def track_camera(name, config, global_objects_config, frame_queue, frame_shape,
for index in idxs: for index in idxs:
obj = group[index[0]] obj = group[index[0]]
if clipped(obj, frame_shape): if clipped(obj, frame_shape): #obj['clipped']:
box = obj[2] box = obj[2]
# calculate a new region that will hopefully get the entire object # calculate a new region that will hopefully get the entire object
region = calculate_region(frame_shape, region = calculate_region(frame_shape,
@@ -367,6 +370,9 @@ def track_camera(name, config, global_objects_config, frame_queue, frame_shape,
# now that we have refined our detections, we need to track objects # now that we have refined our detections, we need to track objects
object_tracker.match_and_update(frame_time, detections) object_tracker.match_and_update(frame_time, detections)
# put the frame in the plasma store
object_id = hashlib.sha1(str.encode(f"{name}{frame_time}")).digest()
plasma_client.put(frame, plasma.ObjectID(object_id))
# add to the queue # add to the queue
detected_objects_queue.put((name, frame_time, object_tracker.tracked_objects)) detected_objects_queue.put((name, frame_time, object_tracker.tracked_objects))