Move more things out of FrigateApp (#13897)

* Moved FrigateApp.init_config() into FrigateConfig.load()

* Move frigate config loading into main

* Store PlusApi in FrigateConfig

* Register SIGTERM handler in main

* Ensure logging is setup during config parsing

* Removed pointless try

* Moved config initialization out of FrigateApp

* Made FrigateApp.shm_frame_count into a function

* Removed log calls from signal handlers

python's logging calls are not re-entrant, which caused at least one of
these to deadlock randomly.

* Reopen stdout/err on process fork

This helps avoid deadlocks (https://github.com/python/cpython/issues/91776).

* Make mypy happy

* Whoops. I might have forgotten to save.

Truly an amateur mistake.

* Always call FrigateApp.stop()
This commit is contained in:
gtsiam
2024-09-24 15:07:47 +03:00
committed by GitHub
parent a7ed90f042
commit dc54981784
15 changed files with 215 additions and 213 deletions

View File

@@ -1,23 +1,17 @@
import argparse
import datetime
import logging
import multiprocessing as mp
import os
import secrets
import shutil
import signal
import sys
import traceback
from multiprocessing import Queue
from multiprocessing.synchronize import Event as MpEvent
from types import FrameType
from typing import Optional
from typing import Any
import psutil
from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase
from pydantic import ValidationError
from frigate.api.app import create_app
from frigate.api.auth import hash_password
@@ -28,7 +22,6 @@ from frigate.comms.mqtt import MqttClient
from frigate.comms.webpush import WebPushClient
from frigate.comms.ws import WebSocketClient
from frigate.comms.zmq_proxy import ZmqProxy
from frigate.config import FrigateConfig
from frigate.const import (
CACHE_DIR,
CLIPS_DIR,
@@ -43,7 +36,6 @@ from frigate.events.audio import listen_to_audio
from frigate.events.cleanup import EventCleanup
from frigate.events.external import ExternalEventProcessor
from frigate.events.maintainer import EventProcessor
from frigate.log import log_thread
from frigate.models import (
Event,
Export,
@@ -58,7 +50,6 @@ from frigate.models import (
from frigate.object_detection import ObjectDetectProcess
from frigate.object_processing import TrackedObjectProcessor
from frigate.output.output import output_frames
from frigate.plus import PlusApi
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController
from frigate.record.cleanup import RecordingCleanup
@@ -70,8 +61,7 @@ from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.types import CameraMetricsTypes, PTZMetricsTypes
from frigate.util.builtin import empty_and_close_queue, save_default_config
from frigate.util.config import migrate_frigate_config
from frigate.util.builtin import empty_and_close_queue
from frigate.util.object import get_camera_regions_grid
from frigate.version import VERSION
from frigate.video import capture_camera, track_camera
@@ -81,22 +71,19 @@ logger = logging.getLogger(__name__)
class FrigateApp:
def __init__(self) -> None:
# TODO: Fix FrigateConfig usage, so we can properly annotate it here without mypy erroring out.
def __init__(self, config: Any) -> None:
self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.plus_api = PlusApi()
self.camera_metrics: dict[str, CameraMetricsTypes] = {}
self.ptz_metrics: dict[str, PTZMetricsTypes] = {}
self.processes: dict[str, int] = {}
self.region_grids: dict[str, list[list[dict[str, int]]]] = {}
def set_environment_vars(self) -> None:
for key, value in self.config.environment_vars.items():
os.environ[key] = value
self.config = config
def ensure_dirs(self) -> None:
for d in [
@@ -113,24 +100,7 @@ class FrigateApp:
else:
logger.debug(f"Skipping directory: {d}")
def init_config(self) -> None:
config_file = os.environ.get("CONFIG_FILE", "/config/config.yml")
# Check if we can use .yaml instead of .yml
config_file_yaml = config_file.replace(".yml", ".yaml")
if os.path.isfile(config_file_yaml):
config_file = config_file_yaml
if not os.path.isfile(config_file):
print("No config file found, saving default config")
config_file = config_file_yaml
save_default_config(config_file)
# check if the config file needs to be migrated
migrate_frigate_config(config_file)
self.config = FrigateConfig.parse_file(config_file, plus_api=self.plus_api)
def init_camera_metrics(self) -> None:
for camera_name in self.config.cameras.keys():
# create camera_metrics
self.camera_metrics[camera_name] = {
@@ -190,17 +160,6 @@ class FrigateApp:
}
self.ptz_metrics[camera_name]["ptz_motor_stopped"].set()
def set_log_levels(self) -> None:
logging.getLogger().setLevel(self.config.logger.default.value.upper())
for log, level in self.config.logger.logs.items():
logging.getLogger(log).setLevel(level.value.upper())
if "werkzeug" not in self.config.logger.logs:
logging.getLogger("werkzeug").setLevel("ERROR")
if "ws4py" not in self.config.logger.logs:
logging.getLogger("ws4py").setLevel("ERROR")
def init_queues(self) -> None:
# Queue for cameras to push tracked objects to
self.detected_frames_queue: Queue = mp.Queue(
@@ -374,19 +333,6 @@ class FrigateApp:
self.inter_config_updater = ConfigPublisher()
self.inter_zmq_proxy = ZmqProxy()
def init_web_server(self) -> None:
self.flask_app = create_app(
self.config,
self.db,
self.embeddings,
self.detected_frames_processor,
self.storage_maintainer,
self.onvif_controller,
self.external_event_processor,
self.plus_api,
self.stats_emitter,
)
def init_onvif(self) -> None:
self.onvif_controller = OnvifController(self.config, self.ptz_metrics)
@@ -527,7 +473,7 @@ class FrigateApp:
capture_process = mp.Process(
target=capture_camera,
name=f"camera_capture:{name}",
args=(name, config, self.shm_frame_count, self.camera_metrics[name]),
args=(name, config, self.shm_frame_count(), self.camera_metrics[name]),
)
capture_process.daemon = True
self.camera_metrics[name]["capture_process"] = capture_process
@@ -590,7 +536,7 @@ class FrigateApp:
self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event)
self.frigate_watchdog.start()
def check_shm(self) -> None:
def shm_frame_count(self) -> int:
total_shm = round(shutil.disk_usage("/dev/shm").total / pow(2, 20), 1)
# required for log files + nginx cache
@@ -610,17 +556,19 @@ class FrigateApp:
1,
)
self.shm_frame_count = min(50, int(available_shm / (cam_total_frame_size)))
shm_frame_count = min(50, int(available_shm / (cam_total_frame_size)))
logger.debug(
f"Calculated total camera size {available_shm} / {cam_total_frame_size} :: {self.shm_frame_count} frames for each camera in SHM"
f"Calculated total camera size {available_shm} / {cam_total_frame_size} :: {shm_frame_count} frames for each camera in SHM"
)
if self.shm_frame_count < 10:
if shm_frame_count < 10:
logger.warning(
f"The current SHM size of {total_shm}MB is too small, recommend increasing it to at least {round(min_req_shm + cam_total_frame_size)}MB."
)
return shm_frame_count
def init_auth(self) -> None:
if self.config.auth.enabled:
if User.select().count() == 0:
@@ -657,97 +605,58 @@ class FrigateApp:
logger.info("********************************************************")
logger.info("********************************************************")
@log_thread()
def start(self) -> None:
parser = argparse.ArgumentParser(
prog="Frigate",
description="An NVR with realtime local object detection for IP cameras.",
)
parser.add_argument("--validate-config", action="store_true")
args = parser.parse_args()
logger.info(f"Starting Frigate ({VERSION})")
try:
self.ensure_dirs()
try:
self.init_config()
except Exception as e:
print("*************************************************************")
print("*************************************************************")
print("*** Your config file is not valid! ***")
print("*** Please check the docs at ***")
print("*** https://docs.frigate.video/configuration/index ***")
print("*************************************************************")
print("*************************************************************")
print("*** Config Validation Errors ***")
print("*************************************************************")
if isinstance(e, ValidationError):
for error in e.errors():
location = ".".join(str(item) for item in error["loc"])
print(f"{location}: {error['msg']}")
else:
print(e)
print(traceback.format_exc())
print("*************************************************************")
print("*** End Config Validation Errors ***")
print("*************************************************************")
sys.exit(1)
if args.validate_config:
print("*************************************************************")
print("*** Your config file is valid. ***")
print("*************************************************************")
sys.exit(0)
self.set_environment_vars()
self.set_log_levels()
self.init_queues()
self.init_database()
self.init_onvif()
self.init_recording_manager()
self.init_review_segment_manager()
self.init_embeddings_manager()
self.init_go2rtc()
self.bind_database()
self.check_db_data_migrations()
self.init_inter_process_communicator()
self.init_dispatcher()
except Exception as e:
print(e)
sys.exit(1)
# Ensure global state.
self.ensure_dirs()
self.config.install()
# Start frigate services.
self.init_camera_metrics()
self.init_queues()
self.init_database()
self.init_onvif()
self.init_recording_manager()
self.init_review_segment_manager()
self.init_embeddings_manager()
self.init_go2rtc()
self.bind_database()
self.check_db_data_migrations()
self.init_inter_process_communicator()
self.init_dispatcher()
self.start_detectors()
self.start_video_output_processor()
self.start_ptz_autotracker()
self.init_historical_regions()
self.start_detected_frames_processor()
self.start_camera_processors()
self.check_shm()
self.start_camera_capture_processes()
self.start_audio_processors()
self.start_storage_maintainer()
self.init_external_event_processor()
self.start_stats_emitter()
self.init_web_server()
self.start_timeline_processor()
self.start_event_processor()
self.start_event_cleanup()
self.start_record_cleanup()
self.start_watchdog()
self.init_auth()
# Flask only listens for SIGINT, so we need to catch SIGTERM and send SIGINT
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
os.kill(os.getpid(), signal.SIGINT)
signal.signal(signal.SIGTERM, receiveSignal)
try:
self.flask_app.run(host="127.0.0.1", port=5001, debug=False, threaded=True)
except KeyboardInterrupt:
pass
logger.info("Flask has exited...")
self.stop()
create_app(
self.config,
self.db,
self.embeddings,
self.detected_frames_processor,
self.storage_maintainer,
self.onvif_controller,
self.external_event_processor,
self.stats_emitter,
).run(host="127.0.0.1", port=5001, debug=False, threaded=True)
finally:
self.stop()
def stop(self) -> None:
logger.info("Stopping...")