Compare commits

..

15 Commits

Author SHA1 Message Date
Blake Blackshear
30ad0e30f8 allow mqtt password to be set by env var 2020-03-01 07:23:56 -06:00
Blake Blackshear
7bad89c9bf update benchmark script to mirror actual frigate use 2020-03-01 07:16:56 -06:00
Blake Blackshear
f077c397f4 improve detection processing and restart when stuck 2020-03-01 07:16:49 -06:00
Blake Blackshear
cc729d83a8 handle ffmpeg process failures in the camera process itself 2020-02-26 19:02:12 -06:00
Blake Blackshear
c520b81e49 add a few print statements for debugging 2020-02-25 20:37:12 -06:00
Blake Blackshear
9c304391c0 dont kill the camera process from the main process 2020-02-25 20:36:30 -06:00
Blake Blackshear
9a12b02d22 increase the buffer size a bit 2020-02-24 20:05:30 -06:00
Blake Blackshear
7686c510b3 add a few more metrics to debug 2020-02-23 18:11:39 -06:00
Blake Blackshear
2f5e322d3c cleanup the plasma store when finished with a frame 2020-02-23 18:11:08 -06:00
Blake Blackshear
1cd4c12104 dont redirect stdout for plasma store 2020-02-23 15:53:17 -06:00
Blake Blackshear
1a8b034685 reset detection fps 2020-02-23 15:53:00 -06:00
Blake Blackshear
da6dc03a57 dont change dictionary while iterating 2020-02-23 11:18:00 -06:00
Blake Blackshear
7fa3b70d2d allow specifying the frame size in the config instead of detecting 2020-02-23 07:56:14 -06:00
Blake Blackshear
1fc5a2bfd4 ensure missing objects are expired even when other object types are in the frame 2020-02-23 07:55:51 -06:00
Blake Blackshear
7e84da7dad Fix watchdog last_frame calculation 2020-02-23 07:55:16 -06:00
6 changed files with 248 additions and 147 deletions

View File

@@ -1,18 +1,79 @@
import statistics import os
from statistics import mean
import multiprocessing as mp
import numpy as np import numpy as np
import time import datetime
from frigate.edgetpu import ObjectDetector from frigate.edgetpu import ObjectDetector, EdgeTPUProcess, RemoteObjectDetector, load_labels
object_detector = ObjectDetector() my_frame = np.expand_dims(np.full((300,300,3), 1, np.uint8), axis=0)
labels = load_labels('/labelmap.txt')
frame = np.zeros((300,300,3), np.uint8) ######
input_frame = np.expand_dims(frame, axis=0) # Minimal same process runner
######
# object_detector = ObjectDetector()
# tensor_input = np.expand_dims(np.full((300,300,3), 0, np.uint8), axis=0)
detection_times = [] # start = datetime.datetime.now().timestamp()
for x in range(0, 100): # frame_times = []
start = time.monotonic() # for x in range(0, 1000):
object_detector.detect_raw(input_frame) # start_frame = datetime.datetime.now().timestamp()
detection_times.append(time.monotonic()-start)
print(f"Average inference time: {statistics.mean(detection_times)*1000:.2f}ms") # tensor_input[:] = my_frame
# detections = object_detector.detect_raw(tensor_input)
# parsed_detections = []
# for d in detections:
# if d[1] < 0.4:
# break
# parsed_detections.append((
# labels[int(d[0])],
# float(d[1]),
# (d[2], d[3], d[4], d[5])
# ))
# frame_times.append(datetime.datetime.now().timestamp()-start_frame)
# duration = datetime.datetime.now().timestamp()-start
# print(f"Processed for {duration:.2f} seconds.")
# print(f"Average frame processing time: {mean(frame_times)*1000:.2f}ms")
######
# Separate process runner
######
def start(id, num_detections, detection_queue):
object_detector = RemoteObjectDetector(str(id), '/labelmap.txt', detection_queue)
start = datetime.datetime.now().timestamp()
frame_times = []
for x in range(0, num_detections):
start_frame = datetime.datetime.now().timestamp()
detections = object_detector.detect(my_frame)
frame_times.append(datetime.datetime.now().timestamp()-start_frame)
duration = datetime.datetime.now().timestamp()-start
print(f"{id} - Processed for {duration:.2f} seconds.")
print(f"{id} - Average frame processing time: {mean(frame_times)*1000:.2f}ms")
edgetpu_process = EdgeTPUProcess()
# start(1, 1000, edgetpu_process.detect_lock, edgetpu_process.detect_ready, edgetpu_process.frame_ready)
####
# Multiple camera processes
####
camera_processes = []
for x in range(0, 10):
camera_process = mp.Process(target=start, args=(x, 100, edgetpu_process.detection_queue))
camera_process.daemon = True
camera_processes.append(camera_process)
start = datetime.datetime.now().timestamp()
for p in camera_processes:
p.start()
for p in camera_processes:
p.join()
duration = datetime.datetime.now().timestamp()-start
print(f"Total - Processed for {duration:.2f} seconds.")

View File

@@ -1,3 +1,4 @@
import os
import cv2 import cv2
import time import time
import datetime import datetime
@@ -16,6 +17,8 @@ from frigate.object_processing import TrackedObjectProcessor
from frigate.util import EventsPerSecond from frigate.util import EventsPerSecond
from frigate.edgetpu import EdgeTPUProcess from frigate.edgetpu import EdgeTPUProcess
FRIGATE_VARS = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
with open('/config/config.yml') as f: with open('/config/config.yml') as f:
CONFIG = yaml.safe_load(f) CONFIG = yaml.safe_load(f)
@@ -24,6 +27,8 @@ MQTT_PORT = CONFIG.get('mqtt', {}).get('port', 1883)
MQTT_TOPIC_PREFIX = CONFIG.get('mqtt', {}).get('topic_prefix', 'frigate') MQTT_TOPIC_PREFIX = CONFIG.get('mqtt', {}).get('topic_prefix', 'frigate')
MQTT_USER = CONFIG.get('mqtt', {}).get('user') MQTT_USER = CONFIG.get('mqtt', {}).get('user')
MQTT_PASS = CONFIG.get('mqtt', {}).get('password') MQTT_PASS = CONFIG.get('mqtt', {}).get('password')
if not MQTT_PASS is None:
MQTT_PASS = MQTT_PASS.format(**FRIGATE_VARS)
MQTT_CLIENT_ID = CONFIG.get('mqtt', {}).get('client_id', 'frigate') MQTT_CLIENT_ID = CONFIG.get('mqtt', {}).get('client_id', 'frigate')
# Set the default FFmpeg config # Set the default FFmpeg config
@@ -66,26 +71,22 @@ class CameraWatchdog(threading.Thread):
time.sleep(10) time.sleep(10)
while True: while True:
# wait a bit before checking # wait a bit before checking
time.sleep(10) time.sleep(30)
if (self.tflite_process.detection_start.value > 0.0 and
datetime.datetime.now().timestamp() - self.tflite_process.detection_start.value > 10):
print("Detection appears to be stuck. Restarting detection process")
time.sleep(30)
for name, camera_process in self.camera_processes.items(): for name, camera_process in self.camera_processes.items():
process = camera_process['process'] process = camera_process['process']
if (datetime.datetime.now().timestamp() - self.object_processor.get_current_frame_time(name)) > 30:
print(f"Last frame for {name} is more than 30 seconds old...")
if process.is_alive():
process.terminate()
print("Waiting for process to exit gracefully...")
process.join(timeout=30)
if process.exitcode is None:
print("Process didnt exit. Force killing...")
process.kill()
process.join()
if not process.is_alive(): if not process.is_alive():
print(f"Process for {name} is not alive. Starting again...") print(f"Process for {name} is not alive. Starting again...")
camera_process['fps'].value = float(self.config[name]['fps']) camera_process['fps'].value = float(self.config[name]['fps'])
camera_process['skipped_fps'].value = 0.0 camera_process['skipped_fps'].value = 0.0
camera_process['detection_fps'].value = 0.0
process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
self.tflite_process.detect_lock, self.tflite_process.detect_ready, self.tflite_process.frame_ready, self.tracked_objects_queue, self.tflite_process.detection_queue, self.tracked_objects_queue,
camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps'])) camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps']))
process.daemon = True process.daemon = True
camera_process['process'] = process camera_process['process'] = process
@@ -117,7 +118,7 @@ def main():
# start plasma store # start plasma store
plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma'] plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma']
plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL) plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL)
time.sleep(1) time.sleep(1)
rc = plasma_process.poll() rc = plasma_process.poll()
if rc is not None: if rc is not None:
@@ -144,11 +145,10 @@ def main():
camera_processes[name] = { camera_processes[name] = {
'fps': mp.Value('d', float(config['fps'])), 'fps': mp.Value('d', float(config['fps'])),
'skipped_fps': mp.Value('d', 0.0), 'skipped_fps': mp.Value('d', 0.0),
'detection_fps': mp.Value('d', 0.0), 'detection_fps': mp.Value('d', 0.0)
'last_frame': datetime.datetime.now().timestamp()
} }
camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
tflite_process.detect_lock, tflite_process.detect_ready, tflite_process.frame_ready, tracked_objects_queue, tflite_process.detection_queue, tracked_objects_queue,
camera_processes[name]['fps'], camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps'])) camera_processes[name]['fps'], camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps']))
camera_process.daemon = True camera_process.daemon = True
camera_processes[name]['process'] = camera_process camera_processes[name]['process'] = camera_process
@@ -182,16 +182,23 @@ def main():
for name, camera_stats in camera_processes.items(): for name, camera_stats in camera_processes.items():
total_detection_fps += camera_stats['detection_fps'].value total_detection_fps += camera_stats['detection_fps'].value
stats[name] = { stats[name] = {
'fps': camera_stats['fps'].value, 'fps': round(camera_stats['fps'].value, 2),
'skipped_fps': camera_stats['skipped_fps'].value, 'skipped_fps': round(camera_stats['skipped_fps'].value, 2),
'detection_fps': camera_stats['detection_fps'].value 'detection_fps': round(camera_stats['detection_fps'].value, 2)
} }
stats['coral'] = { stats['coral'] = {
'fps': total_detection_fps, 'fps': round(total_detection_fps, 2),
'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2) 'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2),
'detection_queue': tflite_process.detection_queue.qsize(),
'detection_start': tflite_process.detection_start.value
} }
rc = plasma_process.poll()
stats['plasma_store_rc'] = rc
stats['tracked_objects_queue'] = tracked_objects_queue.qsize()
return jsonify(stats) return jsonify(stats)
@app.route('/<camera_name>/<label>/best.jpg') @app.route('/<camera_name>/<label>/best.jpg')

View File

@@ -1,8 +1,10 @@
import os import os
import datetime import datetime
import hashlib
import multiprocessing as mp import multiprocessing as mp
import numpy as np import numpy as np
import SharedArray as sa import SharedArray as sa
import pyarrow.plasma as plasma
import tflite_runtime.interpreter as tflite import tflite_runtime.interpreter as tflite
from tflite_runtime.interpreter import load_delegate from tflite_runtime.interpreter import load_delegate
from frigate.util import EventsPerSecond from frigate.util import EventsPerSecond
@@ -60,71 +62,68 @@ class ObjectDetector():
return detections return detections
class EdgeTPUProcess(): def run_detector(detection_queue, avg_speed, start):
def __init__(self):
# TODO: see if we can use the plasma store with a queue and maintain the same speeds
try:
sa.delete("frame")
except:
pass
try:
sa.delete("detections")
except:
pass
self.input_frame = sa.create("frame", shape=(1,300,300,3), dtype=np.uint8)
self.detections = sa.create("detections", shape=(20,6), dtype=np.float32)
self.detect_lock = mp.Lock()
self.detect_ready = mp.Event()
self.frame_ready = mp.Event()
self.avg_inference_speed = mp.Value('d', 0.01)
def run_detector(detect_ready, frame_ready, avg_speed):
print(f"Starting detection process: {os.getpid()}") print(f"Starting detection process: {os.getpid()}")
plasma_client = plasma.connect("/tmp/plasma")
object_detector = ObjectDetector() object_detector = ObjectDetector()
input_frame = sa.attach("frame")
detections = sa.attach("detections")
while True: while True:
# wait until a frame is ready object_id_str = detection_queue.get()
frame_ready.wait() object_id_hash = hashlib.sha1(str.encode(object_id_str))
start = datetime.datetime.now().timestamp() object_id = plasma.ObjectID(object_id_hash.digest())
# signal that the process is busy input_frame = plasma_client.get(object_id, timeout_ms=0)
frame_ready.clear()
detections[:] = object_detector.detect_raw(input_frame) start.value = datetime.datetime.now().timestamp()
# signal that the process is ready to detect
detect_ready.set() # detect and put the output in the plasma store
duration = datetime.datetime.now().timestamp()-start object_id_out = hashlib.sha1(str.encode(f"out-{object_id_str}")).digest()
plasma_client.put(object_detector.detect_raw(input_frame), plasma.ObjectID(object_id_out))
duration = datetime.datetime.now().timestamp()-start.value
start.value = 0.0
avg_speed.value = (avg_speed.value*9 + duration)/10 avg_speed.value = (avg_speed.value*9 + duration)/10
self.detect_process = mp.Process(target=run_detector, args=(self.detect_ready, self.frame_ready, self.avg_inference_speed)) class EdgeTPUProcess():
def __init__(self):
self.detection_queue = mp.Queue()
self.avg_inference_speed = mp.Value('d', 0.01)
self.detection_start = mp.Value('d', 0.0)
self.detect_process = None
self.start_or_restart()
def start_or_restart(self):
self.detection_start.value = 0.0
if (not self.detect_process is None) and self.detect_process.is_alive():
self.detect_process.terminate()
print("Waiting for detection process to exit gracefully...")
self.detect_process.join(timeout=30)
if self.detect_process.exitcode is None:
print("Detection process didnt exit. Force killing...")
self.detect_process.kill()
self.detect_process.join()
self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.avg_inference_speed, self.detection_start))
self.detect_process.daemon = True self.detect_process.daemon = True
self.detect_process.start() self.detect_process.start()
class RemoteObjectDetector(): class RemoteObjectDetector():
def __init__(self, labels, detect_lock, detect_ready, frame_ready): def __init__(self, name, labels, detection_queue):
self.labels = load_labels(labels) self.labels = load_labels(labels)
self.name = name
self.input_frame = sa.attach("frame")
self.detections = sa.attach("detections")
self.fps = EventsPerSecond() self.fps = EventsPerSecond()
self.plasma_client = plasma.connect("/tmp/plasma")
self.detect_lock = detect_lock self.detection_queue = detection_queue
self.detect_ready = detect_ready
self.frame_ready = frame_ready
def detect(self, tensor_input, threshold=.4): def detect(self, tensor_input, threshold=.4):
detections = [] detections = []
with self.detect_lock:
self.input_frame[:] = tensor_input now = f"{self.name}-{str(datetime.datetime.now().timestamp())}"
# unset detections and signal that a frame is ready object_id_frame = plasma.ObjectID(hashlib.sha1(str.encode(now)).digest())
self.detect_ready.clear() object_id_detections = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{now}")).digest())
self.frame_ready.set() self.plasma_client.put(tensor_input, object_id_frame)
# wait until the detection process is finished, self.detection_queue.put(now)
self.detect_ready.wait() raw_detections = self.plasma_client.get(object_id_detections)
for d in self.detections:
for d in raw_detections:
if d[1] < threshold: if d[1] < threshold:
break break
detections.append(( detections.append((
@@ -132,5 +131,6 @@ class RemoteObjectDetector():
float(d[1]), float(d[1]),
(d[2], d[3], d[4], d[5]) (d[2], d[3], d[4], d[5])
)) ))
self.plasma_client.delete([object_id_frame, object_id_detections])
self.fps.update() self.fps.update()
return detections return detections

View File

@@ -34,7 +34,8 @@ class TrackedObjectProcessor(threading.Thread):
'best_objects': {}, 'best_objects': {},
'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')), 'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')),
'tracked_objects': {}, 'tracked_objects': {},
'current_frame_time': datetime.datetime.now().timestamp() 'current_frame': np.zeros((720,1280,3), np.uint8),
'object_id': None
}) })
def get_best(self, camera, label): def get_best(self, camera, label):
@@ -46,9 +47,6 @@ class TrackedObjectProcessor(threading.Thread):
def get_current_frame(self, camera): def get_current_frame(self, camera):
return self.camera_data[camera]['current_frame'] return self.camera_data[camera]['current_frame']
def get_current_frame_time(self, camera):
return self.camera_data[camera]['current_frame_time']
def run(self): def run(self):
while True: while True:
camera, frame_time, tracked_objects = self.tracked_objects_queue.get() camera, frame_time, tracked_objects = self.tracked_objects_queue.get()
@@ -64,8 +62,9 @@ class TrackedObjectProcessor(threading.Thread):
object_id_hash = hashlib.sha1(str.encode(f"{camera}{frame_time}")) object_id_hash = hashlib.sha1(str.encode(f"{camera}{frame_time}"))
object_id_bytes = object_id_hash.digest() object_id_bytes = object_id_hash.digest()
object_id = plasma.ObjectID(object_id_bytes) object_id = plasma.ObjectID(object_id_bytes)
current_frame = self.plasma_client.get(object_id) current_frame = self.plasma_client.get(object_id, timeout_ms=0)
if not current_frame is plasma.ObjectNotAvailable:
# draw the bounding boxes on the frame # draw the bounding boxes on the frame
for obj in tracked_objects.values(): for obj in tracked_objects.values():
thickness = 2 thickness = 2
@@ -90,7 +89,12 @@ class TrackedObjectProcessor(threading.Thread):
# Set the current frame as ready # Set the current frame as ready
### ###
self.camera_data[camera]['current_frame'] = current_frame self.camera_data[camera]['current_frame'] = current_frame
self.camera_data[camera]['current_frame_time'] = frame_time
# store the object id, so you can delete it at the next loop
previous_object_id = self.camera_data[camera]['object_id']
if not previous_object_id is None:
self.plasma_client.delete([previous_object_id])
self.camera_data[camera]['object_id'] = object_id
### ###
# Maintain the highest scoring recent object and frame for each label # Maintain the highest scoring recent object and frame for each label
@@ -104,10 +108,10 @@ class TrackedObjectProcessor(threading.Thread):
# if the object is a higher score than the current best score # if the object is a higher score than the current best score
# or the current object is more than 1 minute old, use the new object # or the current object is more than 1 minute old, use the new object
if obj['score'] > best_objects[obj['label']]['score'] or (now - best_objects[obj['label']]['frame_time']) > 60: if obj['score'] > best_objects[obj['label']]['score'] or (now - best_objects[obj['label']]['frame_time']) > 60:
obj['frame'] = np.copy(current_frame) obj['frame'] = np.copy(self.camera_data[camera]['current_frame'])
best_objects[obj['label']] = obj best_objects[obj['label']] = obj
else: else:
obj['frame'] = np.copy(current_frame) obj['frame'] = np.copy(self.camera_data[camera]['current_frame'])
best_objects[obj['label']] = obj best_objects[obj['label']] = obj
### ###

View File

@@ -49,14 +49,6 @@ class ObjectTracker():
obj['history'] = [entry] obj['history'] = [entry]
def match_and_update(self, frame_time, new_objects): def match_and_update(self, frame_time, new_objects):
if len(new_objects) == 0:
for id in list(self.tracked_objects.keys()):
if self.disappeared[id] >= self.max_disappeared:
self.deregister(id)
else:
self.disappeared[id] += 1
return
# group by name # group by name
new_object_groups = defaultdict(lambda: []) new_object_groups = defaultdict(lambda: [])
for obj in new_objects: for obj in new_objects:
@@ -69,6 +61,18 @@ class ObjectTracker():
'frame_time': frame_time 'frame_time': frame_time
}) })
# update any tracked objects with labels that are not
# seen in the current objects and deregister if needed
for obj in list(self.tracked_objects.values()):
if not obj['label'] in new_object_groups:
if self.disappeared[obj['id']] >= self.max_disappeared:
self.deregister(obj['id'])
else:
self.disappeared[obj['id']] += 1
if len(new_objects) == 0:
return
# track objects for each label type # track objects for each label type
for label, group in new_object_groups.items(): for label, group in new_object_groups.items():
current_objects = [o for o in self.tracked_objects.values() if o['label'] == label] current_objects = [o for o in self.tracked_objects.values() if o['label'] == label]

View File

@@ -20,7 +20,6 @@ from frigate.objects import ObjectTracker
from frigate.edgetpu import RemoteObjectDetector from frigate.edgetpu import RemoteObjectDetector
from frigate.motion import MotionDetector from frigate.motion import MotionDetector
# TODO: add back opencv fallback
def get_frame_shape(source): def get_frame_shape(source):
ffprobe_cmd = " ".join([ ffprobe_cmd = " ".join([
'ffprobe', 'ffprobe',
@@ -99,7 +98,23 @@ def create_tensor_input(frame, region):
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
return np.expand_dims(cropped_frame, axis=0) return np.expand_dims(cropped_frame, axis=0)
def track_camera(name, config, ffmpeg_global_config, global_objects_config, detect_lock, detect_ready, frame_ready, detected_objects_queue, fps, skipped_fps, detection_fps): def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None):
if not ffmpeg_process is None:
print("Terminating the existing ffmpeg process...")
ffmpeg_process.terminate()
try:
print("Waiting for ffmpeg to exit gracefully...")
ffmpeg_process.wait(timeout=30)
except sp.TimeoutExpired:
print("FFmpeg didnt exit. Force killing...")
ffmpeg_process.kill()
ffmpeg_process.wait()
print("Creating ffmpeg process...")
print(" ".join(ffmpeg_cmd))
return sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=frame_size*10)
def track_camera(name, config, ffmpeg_global_config, global_objects_config, detection_queue, detected_objects_queue, fps, skipped_fps, detection_fps):
print(f"Starting process for {name}: {os.getpid()}") print(f"Starting process for {name}: {os.getpid()}")
# Merge the ffmpeg config with the global config # Merge the ffmpeg config with the global config
@@ -109,6 +124,13 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', ffmpeg_global_config['hwaccel_args']) ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', ffmpeg_global_config['hwaccel_args'])
ffmpeg_input_args = ffmpeg.get('input_args', ffmpeg_global_config['input_args']) ffmpeg_input_args = ffmpeg.get('input_args', ffmpeg_global_config['input_args'])
ffmpeg_output_args = ffmpeg.get('output_args', ffmpeg_global_config['output_args']) ffmpeg_output_args = ffmpeg.get('output_args', ffmpeg_global_config['output_args'])
ffmpeg_cmd = (['ffmpeg'] +
ffmpeg_global_args +
ffmpeg_hwaccel_args +
ffmpeg_input_args +
['-i', ffmpeg_input] +
ffmpeg_output_args +
['pipe:'])
# Merge the tracked object config with the global config # Merge the tracked object config with the global config
camera_objects_config = config.get('objects', {}) camera_objects_config = config.get('objects', {})
@@ -125,7 +147,11 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
expected_fps = config['fps'] expected_fps = config['fps']
take_frame = config.get('take_frame', 1) take_frame = config.get('take_frame', 1)
if 'width' in config and 'height' in config:
frame_shape = (config['height'], config['width'], 3)
else:
frame_shape = get_frame_shape(ffmpeg_input) frame_shape = get_frame_shape(ffmpeg_input)
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2] frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
try: try:
@@ -146,21 +172,11 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
mask[:] = 255 mask[:] = 255
motion_detector = MotionDetector(frame_shape, mask, resize_factor=6) motion_detector = MotionDetector(frame_shape, mask, resize_factor=6)
object_detector = RemoteObjectDetector('/labelmap.txt', detect_lock, detect_ready, frame_ready) object_detector = RemoteObjectDetector(name, '/labelmap.txt', detection_queue)
object_tracker = ObjectTracker(10) object_tracker = ObjectTracker(10)
ffmpeg_cmd = (['ffmpeg'] + ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
ffmpeg_global_args +
ffmpeg_hwaccel_args +
ffmpeg_input_args +
['-i', ffmpeg_input] +
ffmpeg_output_args +
['pipe:'])
print(" ".join(ffmpeg_cmd))
ffmpeg_process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=frame_size)
plasma_client = plasma.connect("/tmp/plasma") plasma_client = plasma.connect("/tmp/plasma")
frame_num = 0 frame_num = 0
@@ -177,7 +193,14 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
avg_wait = (avg_wait*99+duration)/100 avg_wait = (avg_wait*99+duration)/100
if not frame_bytes: if not frame_bytes:
break rc = ffmpeg_process.poll()
if rc is not None:
print(f"{name}: ffmpeg_process exited unexpectedly with {rc}")
ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process)
time.sleep(10)
else:
print(f"{name}: ffmpeg_process is still running but didnt return any bytes")
continue
# limit frame rate # limit frame rate
frame_num += 1 frame_num += 1
@@ -350,3 +373,5 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
plasma_client.put(frame, plasma.ObjectID(object_id)) plasma_client.put(frame, plasma.ObjectID(object_id))
# add to the queue # add to the queue
detected_objects_queue.put((name, frame_time, object_tracker.tracked_objects)) detected_objects_queue.put((name, frame_time, object_tracker.tracked_objects))
print(f"{name}: exiting subprocess")