forked from Github/frigate
Limit recording retention to available storage (#3942)
* Add field and migration for segment size * Store the segment size in db * Add comment * Add default * Fix size parsing * Include segment size in recordings endpoint * Start adding storage maintainer * Add storage maintainer and calculate average sizes * Update comment * Store segment and hour avg sizes per camera * Formatting * Keep track of total segment and hour averages * Remove unused files * Cleanup 2 hours of recordings at a time * Formatting * Fix bug * Round segment size * Cleanup some comments * Handle case where segments are not deleted on initial run or is only retained segments * Improve cleanup log * Formatting * Fix typo and improve logging * Catch case where no recordings exist for camera * Specifically define sort * Handle edge case for cameras that only record part time * Increase definition of part time recorder * Remove warning about not supported storage based retention * Add note about storage based retention to recording docs * Add tests for storage maintenance calculation and cleanup * Format tests * Don't run for a camera with no recording segments * Get size of file from cache * Rework camera stats to be more efficient * Remove total and other inefficencies * Rewrite storage cleanup logic to be much more efficient * Fix existing tests * Fix bugs from tests * Add another test * Improve logging * Formatting * Set back correct loop time * Update name * Update comment * Only include segments that have a nonzero size * Catch case where camera has 0 nonzero segment durations * Add test to cover zero bandwidth migration case * Fix test * Incorrect boolean logic * Formatting * Explicity re-define iterator
This commit is contained in:
@@ -1,23 +1,17 @@
|
||||
import json
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
from multiprocessing.queues import Queue
|
||||
from multiprocessing.synchronize import Event
|
||||
from multiprocessing.context import Process
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
from logging.handlers import QueueHandler
|
||||
from typing import Optional
|
||||
from types import FrameType
|
||||
|
||||
import traceback
|
||||
import yaml
|
||||
from peewee_migrate import Router
|
||||
from playhouse.sqlite_ext import SqliteExtDatabase
|
||||
from playhouse.sqliteq import SqliteQueueDatabase
|
||||
from pydantic import ValidationError
|
||||
|
||||
from frigate.config import DetectorTypeEnum, FrigateConfig
|
||||
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
|
||||
@@ -32,6 +26,7 @@ from frigate.output import output_frames
|
||||
from frigate.plus import PlusApi
|
||||
from frigate.record import RecordingCleanup, RecordingMaintainer
|
||||
from frigate.stats import StatsEmitter, stats_init
|
||||
from frigate.storage import StorageMaintainer
|
||||
from frigate.version import VERSION
|
||||
from frigate.video import capture_camera, track_camera
|
||||
from frigate.watchdog import FrigateWatchdog
|
||||
@@ -310,6 +305,10 @@ class FrigateApp:
|
||||
self.recording_cleanup = RecordingCleanup(self.config, self.stop_event)
|
||||
self.recording_cleanup.start()
|
||||
|
||||
def start_storage_maintainer(self) -> None:
|
||||
self.storage_maintainer = StorageMaintainer(self.config, self.stop_event)
|
||||
self.storage_maintainer.start()
|
||||
|
||||
def start_stats_emitter(self) -> None:
|
||||
self.stats_emitter = StatsEmitter(
|
||||
self.config,
|
||||
@@ -369,6 +368,7 @@ class FrigateApp:
|
||||
self.start_event_cleanup()
|
||||
self.start_recording_maintainer()
|
||||
self.start_recording_cleanup()
|
||||
self.start_storage_maintainer()
|
||||
self.start_stats_emitter()
|
||||
self.start_watchdog()
|
||||
# self.zeroconf = broadcast_zeroconf(self.config.mqtt.client_id)
|
||||
|
||||
@@ -751,6 +751,7 @@ def recordings(camera_name):
|
||||
Recordings.id,
|
||||
Recordings.start_time,
|
||||
Recordings.end_time,
|
||||
Recordings.segment_size,
|
||||
Recordings.motion,
|
||||
Recordings.objects,
|
||||
)
|
||||
|
||||
@@ -41,3 +41,4 @@ class Recordings(Model): # type: ignore[misc]
|
||||
duration = FloatField()
|
||||
motion = IntegerField(null=True)
|
||||
objects = IntegerField(null=True)
|
||||
segment_size = FloatField(default=0) # this should be stored as MB
|
||||
|
||||
@@ -284,6 +284,15 @@ class RecordingMaintainer(threading.Thread):
|
||||
f"Copied {file_path} in {datetime.datetime.now().timestamp()-start_frame} seconds."
|
||||
)
|
||||
|
||||
try:
|
||||
segment_size = round(
|
||||
float(os.path.getsize(cache_path)) / 1000000, 1
|
||||
)
|
||||
except OSError:
|
||||
segment_size = 0
|
||||
|
||||
os.remove(cache_path)
|
||||
|
||||
rand_id = "".join(
|
||||
random.choices(string.ascii_lowercase + string.digits, k=6)
|
||||
)
|
||||
@@ -297,10 +306,8 @@ class RecordingMaintainer(threading.Thread):
|
||||
motion=motion_count,
|
||||
# TODO: update this to store list of active objects at some point
|
||||
objects=active_count,
|
||||
segment_size=segment_size,
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Ignoring segment because {file_path} already exists.")
|
||||
os.remove(cache_path)
|
||||
except Exception as e:
|
||||
logger.error(f"Unable to store recording segment {cache_path}")
|
||||
Path(cache_path).unlink(missing_ok=True)
|
||||
|
||||
172
frigate/storage.py
Normal file
172
frigate/storage.py
Normal file
@@ -0,0 +1,172 @@
|
||||
"""Handle storage retention and usage."""
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import threading
|
||||
|
||||
from peewee import fn
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.const import RECORD_DIR
|
||||
from frigate.models import Event, Recordings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
bandwidth_equation = Recordings.segment_size / (
|
||||
Recordings.end_time - Recordings.start_time
|
||||
)
|
||||
|
||||
|
||||
class StorageMaintainer(threading.Thread):
|
||||
"""Maintain frigates recording storage."""
|
||||
|
||||
def __init__(self, config: FrigateConfig, stop_event) -> None:
|
||||
threading.Thread.__init__(self)
|
||||
self.name = "storage_maintainer"
|
||||
self.config = config
|
||||
self.stop_event = stop_event
|
||||
self.camera_storage_stats: dict[str, dict] = {}
|
||||
|
||||
def calculate_camera_bandwidth(self) -> None:
|
||||
"""Calculate an average MB/hr for each camera."""
|
||||
for camera in self.config.cameras.keys():
|
||||
# cameras with < 50 segments should be refreshed to keep size accurate
|
||||
# when few segments are available
|
||||
if self.camera_storage_stats.get(camera, {}).get("needs_refresh", True):
|
||||
self.camera_storage_stats[camera] = {
|
||||
"needs_refresh": (
|
||||
Recordings.select(fn.COUNT(Recordings.id))
|
||||
.where(
|
||||
Recordings.camera == camera, Recordings.segment_size != 0
|
||||
)
|
||||
.scalar()
|
||||
< 50
|
||||
)
|
||||
}
|
||||
|
||||
# calculate MB/hr
|
||||
try:
|
||||
bandwidth = round(
|
||||
Recordings.select(fn.AVG(bandwidth_equation))
|
||||
.where(Recordings.camera == camera, Recordings.segment_size != 0)
|
||||
.limit(100)
|
||||
.scalar()
|
||||
* 3600,
|
||||
2,
|
||||
)
|
||||
except TypeError:
|
||||
bandwidth = 0
|
||||
|
||||
self.camera_storage_stats[camera]["bandwidth"] = bandwidth
|
||||
logger.debug(f"{camera} has a bandwidth of {bandwidth} MB/hr.")
|
||||
|
||||
def check_storage_needs_cleanup(self) -> bool:
|
||||
"""Return if storage needs cleanup."""
|
||||
# currently runs cleanup if less than 1 hour of space is left
|
||||
# disk_usage should not spin up disks
|
||||
hourly_bandwidth = sum(
|
||||
[b["bandwidth"] for b in self.camera_storage_stats.values()]
|
||||
)
|
||||
remaining_storage = round(shutil.disk_usage(RECORD_DIR).free / 1000000, 1)
|
||||
logger.debug(
|
||||
f"Storage cleanup check: {hourly_bandwidth} hourly with remaining storage: {remaining_storage}."
|
||||
)
|
||||
return remaining_storage < hourly_bandwidth
|
||||
|
||||
def reduce_storage_consumption(self) -> None:
|
||||
"""Remove oldest hour of recordings."""
|
||||
logger.debug("Starting storage cleanup.")
|
||||
deleted_segments_size = 0
|
||||
hourly_bandwidth = sum(
|
||||
[b["bandwidth"] for b in self.camera_storage_stats.values()]
|
||||
)
|
||||
|
||||
recordings: Recordings = Recordings.select().order_by(
|
||||
Recordings.start_time.asc()
|
||||
)
|
||||
retained_events: Event = (
|
||||
Event.select()
|
||||
.where(
|
||||
Event.retain_indefinitely == True,
|
||||
Event.has_clip,
|
||||
)
|
||||
.order_by(Event.start_time.asc())
|
||||
.objects()
|
||||
)
|
||||
|
||||
event_start = 0
|
||||
deleted_recordings = set()
|
||||
for recording in recordings.objects().iterator():
|
||||
# check if 1 hour of storage has been reclaimed
|
||||
if deleted_segments_size > hourly_bandwidth:
|
||||
break
|
||||
|
||||
keep = False
|
||||
|
||||
# Now look for a reason to keep this recording segment
|
||||
for idx in range(event_start, len(retained_events)):
|
||||
event = retained_events[idx]
|
||||
|
||||
# if the event starts in the future, stop checking events
|
||||
# and let this recording segment expire
|
||||
if event.start_time > recording.end_time:
|
||||
keep = False
|
||||
break
|
||||
|
||||
# if the event is in progress or ends after the recording starts, keep it
|
||||
# and stop looking at events
|
||||
if event.end_time is None or event.end_time >= recording.start_time:
|
||||
keep = True
|
||||
break
|
||||
|
||||
# if the event ends before this recording segment starts, skip
|
||||
# this event and check the next event for an overlap.
|
||||
# since the events and recordings are sorted, we can skip events
|
||||
# that end before the previous recording segment started on future segments
|
||||
if event.end_time < recording.start_time:
|
||||
event_start = idx
|
||||
|
||||
# Delete recordings not retained indefinitely
|
||||
if not keep:
|
||||
deleted_segments_size += recording.segment_size
|
||||
Path(recording.path).unlink(missing_ok=True)
|
||||
deleted_recordings.add(recording.id)
|
||||
|
||||
# check if need to delete retained segments
|
||||
if deleted_segments_size < hourly_bandwidth:
|
||||
logger.error(
|
||||
f"Could not clear {hourly_bandwidth} currently {deleted_segments_size}, retained recordings must be deleted."
|
||||
)
|
||||
recordings = Recordings.select().order_by(Recordings.start_time.asc())
|
||||
|
||||
for recording in recordings.objects().iterator():
|
||||
if deleted_segments_size > hourly_bandwidth:
|
||||
break
|
||||
|
||||
deleted_segments_size += recording.segment_size
|
||||
Path(recording.path).unlink(missing_ok=True)
|
||||
deleted_recordings.add(recording.id)
|
||||
|
||||
logger.debug(f"Expiring {len(deleted_recordings)} recordings")
|
||||
# delete up to 100,000 at a time
|
||||
max_deletes = 100000
|
||||
deleted_recordings_list = list(deleted_recordings)
|
||||
for i in range(0, len(deleted_recordings_list), max_deletes):
|
||||
Recordings.delete().where(
|
||||
Recordings.id << deleted_recordings_list[i : i + max_deletes]
|
||||
).execute()
|
||||
|
||||
def run(self):
|
||||
"""Check every 5 minutes if storage needs to be cleaned up."""
|
||||
while not self.stop_event.wait(300):
|
||||
|
||||
if not self.camera_storage_stats or True in [
|
||||
r["needs_refresh"] for r in self.camera_storage_stats.values()
|
||||
]:
|
||||
self.calculate_camera_bandwidth()
|
||||
logger.debug(f"Default camera bandwidths: {self.camera_storage_stats}.")
|
||||
|
||||
if self.check_storage_needs_cleanup():
|
||||
self.reduce_storage_consumption()
|
||||
|
||||
logger.info(f"Exiting storage maintainer...")
|
||||
239
frigate/test/test_storage.py
Normal file
239
frigate/test/test_storage.py
Normal file
@@ -0,0 +1,239 @@
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from peewee import DoesNotExist
|
||||
from peewee_migrate import Router
|
||||
from playhouse.sqlite_ext import SqliteExtDatabase
|
||||
from playhouse.sqliteq import SqliteQueueDatabase
|
||||
from playhouse.shortcuts import model_to_dict
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.http import create_app
|
||||
from frigate.models import Event, Recordings
|
||||
from frigate.storage import StorageMaintainer
|
||||
|
||||
from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS
|
||||
|
||||
|
||||
class TestHttp(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# setup clean database for each test run
|
||||
migrate_db = SqliteExtDatabase("test.db")
|
||||
del logging.getLogger("peewee_migrate").handlers[:]
|
||||
router = Router(migrate_db)
|
||||
router.run()
|
||||
migrate_db.close()
|
||||
self.db = SqliteQueueDatabase(TEST_DB)
|
||||
models = [Event, Recordings]
|
||||
self.db.bind(models)
|
||||
|
||||
self.minimal_config = {
|
||||
"mqtt": {"host": "mqtt"},
|
||||
"cameras": {
|
||||
"front_door": {
|
||||
"ffmpeg": {
|
||||
"inputs": [
|
||||
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
|
||||
]
|
||||
},
|
||||
"detect": {
|
||||
"height": 1080,
|
||||
"width": 1920,
|
||||
"fps": 5,
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
self.double_cam_config = {
|
||||
"mqtt": {"host": "mqtt"},
|
||||
"cameras": {
|
||||
"front_door": {
|
||||
"ffmpeg": {
|
||||
"inputs": [
|
||||
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
|
||||
]
|
||||
},
|
||||
"detect": {
|
||||
"height": 1080,
|
||||
"width": 1920,
|
||||
"fps": 5,
|
||||
},
|
||||
},
|
||||
"back_door": {
|
||||
"ffmpeg": {
|
||||
"inputs": [
|
||||
{"path": "rtsp://10.0.0.2:554/video", "roles": ["detect"]}
|
||||
]
|
||||
},
|
||||
"detect": {
|
||||
"height": 1080,
|
||||
"width": 1920,
|
||||
"fps": 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
def tearDown(self):
|
||||
if not self.db.is_closed():
|
||||
self.db.close()
|
||||
|
||||
try:
|
||||
for file in TEST_DB_CLEANUPS:
|
||||
os.remove(file)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def test_segment_calculations(self):
|
||||
"""Test that the segment calculations are correct."""
|
||||
config = FrigateConfig(**self.double_cam_config)
|
||||
storage = StorageMaintainer(config, MagicMock())
|
||||
|
||||
time_keep = datetime.datetime.now().timestamp()
|
||||
rec_fd_id = "1234567.frontdoor"
|
||||
rec_bd_id = "1234568.backdoor"
|
||||
_insert_mock_recording(
|
||||
rec_fd_id,
|
||||
time_keep,
|
||||
time_keep + 10,
|
||||
camera="front_door",
|
||||
seg_size=4,
|
||||
seg_dur=10,
|
||||
)
|
||||
_insert_mock_recording(
|
||||
rec_bd_id,
|
||||
time_keep + 10,
|
||||
time_keep + 20,
|
||||
camera="back_door",
|
||||
seg_size=8,
|
||||
seg_dur=20,
|
||||
)
|
||||
storage.calculate_camera_bandwidth()
|
||||
assert storage.camera_storage_stats == {
|
||||
"front_door": {"bandwidth": 1440, "needs_refresh": True},
|
||||
"back_door": {"bandwidth": 2880, "needs_refresh": True},
|
||||
}
|
||||
|
||||
def test_segment_calculations_with_zero_segments(self):
|
||||
"""Ensure segment calculation does not fail when migrating from previous version."""
|
||||
config = FrigateConfig(**self.minimal_config)
|
||||
storage = StorageMaintainer(config, MagicMock())
|
||||
|
||||
time_keep = datetime.datetime.now().timestamp()
|
||||
rec_fd_id = "1234567.frontdoor"
|
||||
_insert_mock_recording(
|
||||
rec_fd_id,
|
||||
time_keep,
|
||||
time_keep + 10,
|
||||
camera="front_door",
|
||||
seg_size=0,
|
||||
seg_dur=10,
|
||||
)
|
||||
storage.calculate_camera_bandwidth()
|
||||
assert storage.camera_storage_stats == {
|
||||
"front_door": {"bandwidth": 0, "needs_refresh": True},
|
||||
}
|
||||
|
||||
def test_storage_cleanup(self):
|
||||
"""Ensure that all recordings are cleaned up when necessary."""
|
||||
config = FrigateConfig(**self.minimal_config)
|
||||
storage = StorageMaintainer(config, MagicMock())
|
||||
|
||||
id = "123456.keep"
|
||||
time_keep = datetime.datetime.now().timestamp()
|
||||
_insert_mock_event(id, time_keep, time_keep + 30, True)
|
||||
rec_k_id = "1234567.keep"
|
||||
rec_k2_id = "1234568.keep"
|
||||
rec_k3_id = "1234569.keep"
|
||||
_insert_mock_recording(rec_k_id, time_keep, time_keep + 10)
|
||||
_insert_mock_recording(rec_k2_id, time_keep + 10, time_keep + 20)
|
||||
_insert_mock_recording(rec_k3_id, time_keep + 20, time_keep + 30)
|
||||
|
||||
id2 = "7890.delete"
|
||||
time_delete = datetime.datetime.now().timestamp() - 360
|
||||
_insert_mock_event(id2, time_delete, time_delete + 30, False)
|
||||
rec_d_id = "78901.delete"
|
||||
rec_d2_id = "78902.delete"
|
||||
rec_d3_id = "78903.delete"
|
||||
_insert_mock_recording(rec_d_id, time_delete, time_delete + 10)
|
||||
_insert_mock_recording(rec_d2_id, time_delete + 10, time_delete + 20)
|
||||
_insert_mock_recording(rec_d3_id, time_delete + 20, time_delete + 30)
|
||||
|
||||
storage.calculate_camera_bandwidth()
|
||||
storage.reduce_storage_consumption()
|
||||
with self.assertRaises(DoesNotExist):
|
||||
assert Recordings.get(Recordings.id == rec_k_id)
|
||||
assert Recordings.get(Recordings.id == rec_k2_id)
|
||||
assert Recordings.get(Recordings.id == rec_k3_id)
|
||||
Recordings.get(Recordings.id == rec_d_id)
|
||||
Recordings.get(Recordings.id == rec_d2_id)
|
||||
Recordings.get(Recordings.id == rec_d3_id)
|
||||
|
||||
def test_storage_cleanup_keeps_retained(self):
|
||||
"""Ensure that all recordings are cleaned up when necessary."""
|
||||
config = FrigateConfig(**self.minimal_config)
|
||||
storage = StorageMaintainer(config, MagicMock())
|
||||
|
||||
id = "123456.keep"
|
||||
time_keep = datetime.datetime.now().timestamp()
|
||||
_insert_mock_event(id, time_keep, time_keep + 30, True)
|
||||
rec_k_id = "1234567.keep"
|
||||
rec_k2_id = "1234568.keep"
|
||||
rec_k3_id = "1234569.keep"
|
||||
_insert_mock_recording(rec_k_id, time_keep, time_keep + 10)
|
||||
_insert_mock_recording(rec_k2_id, time_keep + 10, time_keep + 20)
|
||||
_insert_mock_recording(rec_k3_id, time_keep + 20, time_keep + 30)
|
||||
|
||||
time_delete = datetime.datetime.now().timestamp() - 7200
|
||||
for i in range(0, 59):
|
||||
_insert_mock_recording(
|
||||
f"{123456 + i}.delete", time_delete, time_delete + 600
|
||||
)
|
||||
|
||||
storage.calculate_camera_bandwidth()
|
||||
storage.reduce_storage_consumption()
|
||||
assert Recordings.get(Recordings.id == rec_k_id)
|
||||
assert Recordings.get(Recordings.id == rec_k2_id)
|
||||
assert Recordings.get(Recordings.id == rec_k3_id)
|
||||
|
||||
|
||||
def _insert_mock_event(id: str, start: int, end: int, retain: bool) -> Event:
|
||||
"""Inserts a basic event model with a given id."""
|
||||
return Event.insert(
|
||||
id=id,
|
||||
label="Mock",
|
||||
camera="front_door",
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
top_score=100,
|
||||
false_positive=False,
|
||||
zones=list(),
|
||||
thumbnail="",
|
||||
region=[],
|
||||
box=[],
|
||||
area=0,
|
||||
has_clip=True,
|
||||
has_snapshot=True,
|
||||
retain_indefinitely=retain,
|
||||
).execute()
|
||||
|
||||
|
||||
def _insert_mock_recording(
|
||||
id: str, start: int, end: int, camera="front_door", seg_size=8, seg_dur=10
|
||||
) -> Event:
|
||||
"""Inserts a basic recording model with a given id."""
|
||||
return Recordings.insert(
|
||||
id=id,
|
||||
camera=camera,
|
||||
path=f"/recordings/{id}",
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
duration=seg_dur,
|
||||
motion=True,
|
||||
objects=True,
|
||||
segment_size=seg_size,
|
||||
).execute()
|
||||
Reference in New Issue
Block a user