Limit shm frame count (#12363)

* Limited shm frame count (#12346)

* Only keep 2x detect fps frames in SHM

* Don't delete previous shm frames in output

* Catch case where images do not exist

* Ensure files are closed

* Clear out all frames when shutting down

* Correct the number of frames saved

* Simplify empty shm error handling

* Improve frame safety

* Add handler logs when frame is None

* Don't fail on cleanup

* Cleanup logging

* Update docs

* Update calculation

* Restore condition

* Fix case where thumbnail is saved without frame

* Adjust debug logs

* Calculate best shm frame count

* Fix shm count calculation

* Catch missing frame

* Formatting

* Clarify docs

* Catch none frame in autotracking
This commit is contained in:
Nicolas Mowen
2024-09-03 10:22:30 -06:00
committed by GitHub
parent 58a471e466
commit 9afa1354da
11 changed files with 160 additions and 80 deletions

View File

@@ -537,7 +537,7 @@ class FrigateApp:
capture_process = mp.Process(
target=capture_camera,
name=f"camera_capture:{name}",
args=(name, config, self.camera_metrics[name]),
args=(name, config, self.shm_frame_count, self.camera_metrics[name]),
)
capture_process.daemon = True
self.camera_metrics[name]["capture_process"] = capture_process
@@ -601,19 +601,34 @@ class FrigateApp:
self.frigate_watchdog.start()
def check_shm(self) -> None:
available_shm = round(shutil.disk_usage("/dev/shm").total / pow(2, 20), 1)
min_req_shm = 30
total_shm = round(shutil.disk_usage("/dev/shm").total / pow(2, 20), 1)
for _, camera in self.config.cameras.items():
min_req_shm += round(
(camera.detect.width * camera.detect.height * 1.5 * 9 + 270480)
/ 1048576,
1,
)
# required for log files + nginx cache
min_req_shm = 40 + 10
if available_shm < min_req_shm:
if self.config.birdseye.restream:
min_req_shm += 8
available_shm = total_shm - min_req_shm
cam_total_frame_size = 0
for camera in self.config.cameras.values():
if camera.enabled:
cam_total_frame_size += round(
(camera.detect.width * camera.detect.height * 1.5 + 270480)
/ 1048576,
1,
)
self.shm_frame_count = min(50, int(available_shm / (cam_total_frame_size)))
logger.debug(
f"Calculated total camera size {available_shm} / {cam_total_frame_size} :: {self.shm_frame_count} frames for each camera in SHM"
)
if self.shm_frame_count < 10:
logger.warning(
f"The current SHM size of {available_shm}MB is too small, recommend increasing it to at least {min_req_shm}MB."
f"The current SHM size of {total_shm}MB is too small, recommend increasing it to at least {round(min_req_shm + cam_total_frame_size)}MB."
)
def init_auth(self) -> None:
@@ -718,6 +733,7 @@ class FrigateApp:
self.init_historical_regions()
self.start_detected_frames_processor()
self.start_camera_processors()
self.check_shm()
self.start_camera_capture_processes()
self.start_audio_processors()
self.start_storage_maintainer()
@@ -729,7 +745,6 @@ class FrigateApp:
self.start_event_cleanup()
self.start_record_cleanup()
self.start_watchdog()
self.check_shm()
self.init_auth()
# Flask only listens for SIGINT, so we need to catch SIGTERM and send SIGINT

View File

@@ -78,9 +78,11 @@ class EmbeddingMaintainer(threading.Thread):
try:
frame_id = f"{camera}{data['frame_time']}"
yuv_frame = self.frame_manager.get(frame_id, camera_config.frame_shape_yuv)
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"])
self.tracked_events[data["id"]].append(data)
self.frame_manager.close(frame_id)
if yuv_frame is not None:
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"])
self.tracked_events[data["id"]].append(data)
self.frame_manager.close(frame_id)
except FileNotFoundError:
pass

View File

@@ -118,6 +118,7 @@ def run_detector(
)
if input_frame is None:
logger.warning(f"Failed to get frame {connection_id} from SHM")
continue
# detect and send the output

View File

@@ -147,7 +147,7 @@ class TrackedObject:
"""get median of scores for object."""
return median(self.score_history)
def update(self, current_frame_time, obj_data):
def update(self, current_frame_time: float, obj_data, has_valid_frame: bool):
thumb_update = False
significant_change = False
autotracker_update = False
@@ -168,7 +168,7 @@ class TrackedObject:
self.false_positive = self._is_false_positive()
self.active = self.is_active()
if not self.false_positive:
if not self.false_positive and has_valid_frame:
# determine if this frame is a better thumbnail
if self.thumbnail_data is None or is_better_thumbnail(
self.obj_data["label"],
@@ -668,10 +668,14 @@ class CameraState:
def update(self, frame_time, current_detections, motion_boxes, regions):
# get the new frame
frame_id = f"{self.name}{frame_time}"
current_frame = self.frame_manager.get(
frame_id, self.camera_config.frame_shape_yuv
)
if current_frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM")
tracked_objects = self.tracked_objects.copy()
current_ids = set(current_detections.keys())
previous_ids = set(tracked_objects.keys())
@@ -695,14 +699,14 @@ class CameraState:
for id in updated_ids:
updated_obj = tracked_objects[id]
thumb_update, significant_update, autotracker_update = updated_obj.update(
frame_time, current_detections[id]
frame_time, current_detections[id], current_frame is not None
)
if autotracker_update or significant_update:
for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_time)
if thumb_update:
if thumb_update and current_frame is not None:
# ensure this frame is stored in the cache
if (
updated_obj.thumbnail_data["frame_time"] == frame_time
@@ -886,12 +890,16 @@ class CameraState:
with self.current_frame_lock:
self.tracked_objects = tracked_objects
self.current_frame_time = frame_time
self.motion_boxes = motion_boxes
self.regions = regions
self._current_frame = current_frame
if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)
if current_frame is not None:
self.current_frame_time = frame_time
self._current_frame = current_frame
if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)
self.previous_frame_id = frame_id

View File

@@ -357,16 +357,14 @@ class BirdsEyeFrameManager:
frame = None
channel_dims = None
else:
try:
frame = self.frame_manager.get(
f"{camera}{frame_time}", self.config.cameras[camera].frame_shape_yuv
)
except FileNotFoundError:
# TODO: better frame management would prevent this edge case
logger.warning(
f"Unable to copy frame {camera}{frame_time} to birdseye."
)
frame = self.frame_manager.get(
f"{camera}{frame_time}", self.config.cameras[camera].frame_shape_yuv
)
if frame is None:
logger.debug(f"Unable to copy frame {camera}{frame_time} to birdseye.")
return
channel_dims = self.cameras[camera]["channel_dims"]
copy_yuv_to_position(

View File

@@ -45,7 +45,6 @@ def output_frames(
signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager()
previous_frames = {}
# start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1"
@@ -99,6 +98,10 @@ def output_frames(
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
if frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM")
continue
# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
@@ -128,10 +131,6 @@ def output_frames(
)
preview_write_times[camera] = frame_time
# delete frames after they have been used for output
if camera in previous_frames:
frame_manager.delete(f"{camera}{previous_frames[camera]}")
# if another camera generated a preview,
# check for any cameras that are currently offline
# and need to generate a preview
@@ -141,7 +140,7 @@ def output_frames(
preview_recorders[camera].flag_offline(frame_time)
preview_write_times[camera] = frame_time
previous_frames[camera] = frame_time
frame_manager.close(frame_id)
move_preview_frames("clips")
@@ -161,7 +160,7 @@ def output_frames(
frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
frame_manager.delete(frame_id)
frame_manager.close(frame_id)
detection_subscriber.stop()

View File

@@ -99,6 +99,10 @@ class PtzMotionEstimator:
frame_id, self.camera_config.frame_shape_yuv
)
if yuv_frame is None:
self.coord_transformations = None
return None
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2GRAY_I420)
# mask out detections for better motion estimation

View File

@@ -292,6 +292,11 @@ class ReviewSegmentMaintainer(threading.Thread):
yuv_frame = self.frame_manager.get(
frame_id, camera_config.frame_shape_yuv
)
if yuv_frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM")
return
self.update_segment(
segment, camera_config, yuv_frame, active_objects, prev_data
)
@@ -305,6 +310,11 @@ class ReviewSegmentMaintainer(threading.Thread):
yuv_frame = self.frame_manager.get(
frame_id, camera_config.frame_shape_yuv
)
if yuv_frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM")
return
segment.save_full_frame(camera_config, yuv_frame)
self.frame_manager.close(frame_id)
self.update_segment(segment, camera_config, None, [], prev_data)
@@ -401,6 +411,11 @@ class ReviewSegmentMaintainer(threading.Thread):
yuv_frame = self.frame_manager.get(
frame_id, camera_config.frame_shape_yuv
)
if yuv_frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM")
return
self.active_review_segments[camera].update_frame(
camera_config, yuv_frame, active_objects
)

View File

@@ -687,31 +687,46 @@ class DictFrameManager(FrameManager):
class SharedMemoryFrameManager(FrameManager):
def __init__(self):
self.shm_store = {}
self.shm_store: dict[str, shared_memory.SharedMemory] = {}
def create(self, name, size) -> AnyStr:
def create(self, name: str, size) -> AnyStr:
shm = shared_memory.SharedMemory(name=name, create=True, size=size)
self.shm_store[name] = shm
return shm.buf
def get(self, name, shape):
def get(self, name: str, shape) -> Optional[np.ndarray]:
try:
if name in self.shm_store:
shm = self.shm_store[name]
else:
shm = shared_memory.SharedMemory(name=name)
self.shm_store[name] = shm
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
except FileNotFoundError:
return None
def close(self, name: str):
if name in self.shm_store:
shm = self.shm_store[name]
self.shm_store[name].close()
del self.shm_store[name]
def delete(self, name: str):
if name in self.shm_store:
self.shm_store[name].close()
try:
self.shm_store[name].unlink()
except FileNotFoundError:
pass
del self.shm_store[name]
else:
shm = shared_memory.SharedMemory(name=name)
self.shm_store[name] = shm
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
def close(self, name):
if name in self.shm_store:
self.shm_store[name].close()
del self.shm_store[name]
def delete(self, name):
if name in self.shm_store:
self.shm_store[name].close()
self.shm_store[name].unlink()
del self.shm_store[name]
try:
shm = shared_memory.SharedMemory(name=name)
shm.close()
shm.unlink()
except FileNotFoundError:
pass
def create_mask(frame_shape, mask):

View File

@@ -94,7 +94,8 @@ def start_or_restart_ffmpeg(
def capture_frames(
ffmpeg_process,
camera_name,
config: CameraConfig,
shm_frame_count: int,
frame_shape,
frame_manager: FrameManager,
frame_queue,
@@ -108,27 +109,40 @@ def capture_frames(
frame_rate.start()
skipped_eps = EventsPerSecond()
skipped_eps.start()
shm_frames: list[str] = []
while True:
fps.value = frame_rate.eps()
skipped_fps.value = skipped_eps.eps()
current_frame.value = datetime.datetime.now().timestamp()
frame_name = f"{camera_name}{current_frame.value}"
frame_name = f"{config.name}{current_frame.value}"
frame_buffer = frame_manager.create(frame_name, frame_size)
try:
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
# update frame cache and cleanup existing frames
shm_frames.append(frame_name)
if len(shm_frames) > shm_frame_count:
expired_frame_name = shm_frames.pop(0)
frame_manager.delete(expired_frame_name)
except Exception:
# always delete the frame
frame_manager.delete(frame_name)
# shutdown has been initiated
if stop_event.is_set():
break
logger.error(f"{camera_name}: Unable to read frames from ffmpeg process.")
logger.error(f"{config.name}: Unable to read frames from ffmpeg process.")
if ffmpeg_process.poll() is not None:
logger.error(
f"{camera_name}: ffmpeg process is not running. exiting capture thread..."
f"{config.name}: ffmpeg process is not running. exiting capture thread..."
)
frame_manager.delete(frame_name)
break
continue
frame_rate.update()
@@ -137,12 +151,14 @@ def capture_frames(
try:
# add to the queue
frame_queue.put(current_frame.value, False)
# close the frame
frame_manager.close(frame_name)
except queue.Full:
# if the queue is full, skip this frame
skipped_eps.update()
frame_manager.delete(frame_name)
# clear out frames
for frame in shm_frames:
frame_manager.delete(frame)
class CameraWatchdog(threading.Thread):
@@ -150,6 +166,7 @@ class CameraWatchdog(threading.Thread):
self,
camera_name,
config: CameraConfig,
shm_frame_count: int,
frame_queue,
camera_fps,
skipped_fps,
@@ -160,6 +177,7 @@ class CameraWatchdog(threading.Thread):
self.logger = logging.getLogger(f"watchdog.{camera_name}")
self.camera_name = camera_name
self.config = config
self.shm_frame_count = shm_frame_count
self.capture_thread = None
self.ffmpeg_detect_process = None
self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect")
@@ -282,7 +300,8 @@ class CameraWatchdog(threading.Thread):
)
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
self.capture_thread = CameraCapture(
self.camera_name,
self.config,
self.shm_frame_count,
self.ffmpeg_detect_process,
self.frame_shape,
self.frame_queue,
@@ -321,7 +340,8 @@ class CameraWatchdog(threading.Thread):
class CameraCapture(threading.Thread):
def __init__(
self,
camera_name,
config: CameraConfig,
shm_frame_count: int,
ffmpeg_process,
frame_shape,
frame_queue,
@@ -330,8 +350,9 @@ class CameraCapture(threading.Thread):
stop_event,
):
threading.Thread.__init__(self)
self.name = f"capture:{camera_name}"
self.camera_name = camera_name
self.name = f"capture:{config.name}"
self.config = config
self.shm_frame_count = shm_frame_count
self.frame_shape = frame_shape
self.frame_queue = frame_queue
self.fps = fps
@@ -345,7 +366,8 @@ class CameraCapture(threading.Thread):
def run(self):
capture_frames(
self.ffmpeg_process,
self.camera_name,
self.config,
self.shm_frame_count,
self.frame_shape,
self.frame_manager,
self.frame_queue,
@@ -356,7 +378,7 @@ class CameraCapture(threading.Thread):
)
def capture_camera(name, config: CameraConfig, process_info):
def capture_camera(name, config: CameraConfig, shm_frame_count: int, process_info):
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
@@ -373,6 +395,7 @@ def capture_camera(name, config: CameraConfig, process_info):
camera_watchdog = CameraWatchdog(
name,
config,
shm_frame_count,
frame_queue,
process_info["camera_fps"],
process_info["skipped_fps"],
@@ -567,7 +590,7 @@ def process_frames(
)
if frame is None:
logger.info(f"{camera_name}: frame {frame_time} is not in memory store.")
logger.debug(f"{camera_name}: frame {frame_time} is not in memory store.")
continue
# look for motion if enabled