forked from Github/frigate
0
frigate/stats/__init__.py
Normal file
0
frigate/stats/__init__.py
Normal file
61
frigate/stats/emitter.py
Normal file
61
frigate/stats/emitter.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""Emit stats to listeners."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from multiprocessing.synchronize import Event as MpEvent
|
||||
|
||||
from frigate.comms.inter_process import InterProcessRequestor
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.stats.util import stats_snapshot
|
||||
from frigate.types import StatsTrackingTypes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StatsEmitter(threading.Thread):
|
||||
def __init__(
|
||||
self,
|
||||
config: FrigateConfig,
|
||||
stats_tracking: StatsTrackingTypes,
|
||||
stop_event: MpEvent,
|
||||
):
|
||||
threading.Thread.__init__(self)
|
||||
self.name = "frigate_stats_emitter"
|
||||
self.config = config
|
||||
self.stats_tracking = stats_tracking
|
||||
self.stop_event = stop_event
|
||||
self.hwaccel_errors: list[str] = []
|
||||
self.stats_history: list[dict[str, any]] = []
|
||||
|
||||
# create communication for stats
|
||||
self.requestor = InterProcessRequestor()
|
||||
|
||||
def get_latest_stats(self) -> dict[str, any]:
|
||||
"""Get latest stats."""
|
||||
if len(self.stats_history) > 0:
|
||||
return self.stats_history[-1]
|
||||
else:
|
||||
stats = stats_snapshot(
|
||||
self.config, self.stats_tracking, self.hwaccel_errors
|
||||
)
|
||||
self.stats_history.append(stats)
|
||||
return stats
|
||||
|
||||
def get_stats_history(self) -> list[dict[str, any]]:
|
||||
"""Get stats history."""
|
||||
return self.stats_history
|
||||
|
||||
def run(self) -> None:
|
||||
time.sleep(10)
|
||||
while not self.stop_event.wait(self.config.mqtt.stats_interval):
|
||||
logger.debug("Starting stats collection")
|
||||
stats = stats_snapshot(
|
||||
self.config, self.stats_tracking, self.hwaccel_errors
|
||||
)
|
||||
self.stats_history.append(stats)
|
||||
self.stats_history = self.stats_history[-10:]
|
||||
self.requestor.send_data("stats", json.dumps(stats))
|
||||
logger.debug("Finished stats collection")
|
||||
logger.info("Exiting stats emitter...")
|
||||
315
frigate/stats/util.py
Normal file
315
frigate/stats/util.py
Normal file
@@ -0,0 +1,315 @@
|
||||
"""Utilities for stats."""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
from typing import Any, Optional
|
||||
|
||||
import psutil
|
||||
import requests
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
|
||||
from frigate.object_detection import ObjectDetectProcess
|
||||
from frigate.types import CameraMetricsTypes, StatsTrackingTypes
|
||||
from frigate.util.services import (
|
||||
get_amd_gpu_stats,
|
||||
get_bandwidth_stats,
|
||||
get_cpu_stats,
|
||||
get_intel_gpu_stats,
|
||||
get_jetson_stats,
|
||||
get_nvidia_gpu_stats,
|
||||
is_vaapi_amd_driver,
|
||||
)
|
||||
from frigate.version import VERSION
|
||||
|
||||
|
||||
def get_latest_version(config: FrigateConfig) -> str:
|
||||
if not config.telemetry.version_check:
|
||||
return "disabled"
|
||||
|
||||
try:
|
||||
request = requests.get(
|
||||
"https://api.github.com/repos/blakeblackshear/frigate/releases/latest",
|
||||
timeout=10,
|
||||
)
|
||||
except RequestException:
|
||||
return "unknown"
|
||||
|
||||
response = request.json()
|
||||
|
||||
if request.ok and response and "tag_name" in response:
|
||||
return str(response.get("tag_name").replace("v", ""))
|
||||
else:
|
||||
return "unknown"
|
||||
|
||||
|
||||
def stats_init(
|
||||
config: FrigateConfig,
|
||||
camera_metrics: dict[str, CameraMetricsTypes],
|
||||
detectors: dict[str, ObjectDetectProcess],
|
||||
processes: dict[str, int],
|
||||
) -> StatsTrackingTypes:
|
||||
stats_tracking: StatsTrackingTypes = {
|
||||
"camera_metrics": camera_metrics,
|
||||
"detectors": detectors,
|
||||
"started": int(time.time()),
|
||||
"latest_frigate_version": get_latest_version(config),
|
||||
"last_updated": int(time.time()),
|
||||
"processes": processes,
|
||||
}
|
||||
return stats_tracking
|
||||
|
||||
|
||||
def get_fs_type(path: str) -> str:
|
||||
bestMatch = ""
|
||||
fsType = ""
|
||||
for part in psutil.disk_partitions(all=True):
|
||||
if path.startswith(part.mountpoint) and len(bestMatch) < len(part.mountpoint):
|
||||
fsType = part.fstype
|
||||
bestMatch = part.mountpoint
|
||||
return fsType
|
||||
|
||||
|
||||
def read_temperature(path: str) -> Optional[float]:
|
||||
if os.path.isfile(path):
|
||||
with open(path) as f:
|
||||
line = f.readline().strip()
|
||||
return int(line) / 1000
|
||||
return None
|
||||
|
||||
|
||||
def get_temperatures() -> dict[str, float]:
|
||||
temps = {}
|
||||
|
||||
# Get temperatures for all attached Corals
|
||||
base = "/sys/class/apex/"
|
||||
if os.path.isdir(base):
|
||||
for apex in os.listdir(base):
|
||||
temp = read_temperature(os.path.join(base, apex, "temp"))
|
||||
if temp is not None:
|
||||
temps[apex] = temp
|
||||
|
||||
return temps
|
||||
|
||||
|
||||
def get_processing_stats(
|
||||
config: FrigateConfig, stats: dict[str, str], hwaccel_errors: list[str]
|
||||
) -> None:
|
||||
"""Get stats for cpu / gpu."""
|
||||
|
||||
async def run_tasks() -> None:
|
||||
stats_tasks = [
|
||||
asyncio.create_task(set_gpu_stats(config, stats, hwaccel_errors)),
|
||||
asyncio.create_task(set_cpu_stats(stats)),
|
||||
]
|
||||
|
||||
if config.telemetry.stats.network_bandwidth:
|
||||
stats_tasks.append(asyncio.create_task(set_bandwidth_stats(config, stats)))
|
||||
|
||||
await asyncio.wait(stats_tasks)
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(run_tasks())
|
||||
loop.close()
|
||||
|
||||
|
||||
async def set_cpu_stats(all_stats: dict[str, Any]) -> None:
|
||||
"""Set cpu usage from top."""
|
||||
cpu_stats = get_cpu_stats()
|
||||
|
||||
if cpu_stats:
|
||||
all_stats["cpu_usages"] = cpu_stats
|
||||
|
||||
|
||||
async def set_bandwidth_stats(config: FrigateConfig, all_stats: dict[str, Any]) -> None:
|
||||
"""Set bandwidth from nethogs."""
|
||||
bandwidth_stats = get_bandwidth_stats(config)
|
||||
|
||||
if bandwidth_stats:
|
||||
all_stats["bandwidth_usages"] = bandwidth_stats
|
||||
|
||||
|
||||
async def set_gpu_stats(
|
||||
config: FrigateConfig, all_stats: dict[str, Any], hwaccel_errors: list[str]
|
||||
) -> None:
|
||||
"""Parse GPUs from hwaccel args and use for stats."""
|
||||
hwaccel_args = []
|
||||
|
||||
for camera in config.cameras.values():
|
||||
args = camera.ffmpeg.hwaccel_args
|
||||
|
||||
if isinstance(args, list):
|
||||
args = " ".join(args)
|
||||
|
||||
if args and args not in hwaccel_args:
|
||||
hwaccel_args.append(args)
|
||||
|
||||
for stream_input in camera.ffmpeg.inputs:
|
||||
args = stream_input.hwaccel_args
|
||||
|
||||
if isinstance(args, list):
|
||||
args = " ".join(args)
|
||||
|
||||
if args and args not in hwaccel_args:
|
||||
hwaccel_args.append(args)
|
||||
|
||||
stats: dict[str, dict] = {}
|
||||
|
||||
for args in hwaccel_args:
|
||||
if args in hwaccel_errors:
|
||||
# known erroring args should automatically return as error
|
||||
stats["error-gpu"] = {"gpu": -1, "mem": -1}
|
||||
elif "cuvid" in args or "nvidia" in args:
|
||||
# nvidia GPU
|
||||
nvidia_usage = get_nvidia_gpu_stats()
|
||||
|
||||
if nvidia_usage:
|
||||
for i in range(len(nvidia_usage)):
|
||||
stats[nvidia_usage[i]["name"]] = {
|
||||
"gpu": str(round(float(nvidia_usage[i]["gpu"]), 2)) + "%",
|
||||
"mem": str(round(float(nvidia_usage[i]["mem"]), 2)) + "%",
|
||||
"enc": str(round(float(nvidia_usage[i]["enc"]), 2)) + "%",
|
||||
"dec": str(round(float(nvidia_usage[i]["dec"]), 2)) + "%",
|
||||
}
|
||||
|
||||
else:
|
||||
stats["nvidia-gpu"] = {"gpu": -1, "mem": -1}
|
||||
hwaccel_errors.append(args)
|
||||
elif "nvmpi" in args or "jetson" in args:
|
||||
# nvidia Jetson
|
||||
jetson_usage = get_jetson_stats()
|
||||
|
||||
if jetson_usage:
|
||||
stats["jetson-gpu"] = jetson_usage
|
||||
else:
|
||||
stats["jetson-gpu"] = {"gpu": -1, "mem": -1}
|
||||
hwaccel_errors.append(args)
|
||||
elif "qsv" in args:
|
||||
if not config.telemetry.stats.intel_gpu_stats:
|
||||
continue
|
||||
|
||||
# intel QSV GPU
|
||||
intel_usage = get_intel_gpu_stats()
|
||||
|
||||
if intel_usage:
|
||||
stats["intel-qsv"] = intel_usage
|
||||
else:
|
||||
stats["intel-qsv"] = {"gpu": -1, "mem": -1}
|
||||
hwaccel_errors.append(args)
|
||||
elif "vaapi" in args:
|
||||
if is_vaapi_amd_driver():
|
||||
if not config.telemetry.stats.amd_gpu_stats:
|
||||
continue
|
||||
|
||||
# AMD VAAPI GPU
|
||||
amd_usage = get_amd_gpu_stats()
|
||||
|
||||
if amd_usage:
|
||||
stats["amd-vaapi"] = amd_usage
|
||||
else:
|
||||
stats["amd-vaapi"] = {"gpu": -1, "mem": -1}
|
||||
hwaccel_errors.append(args)
|
||||
else:
|
||||
if not config.telemetry.stats.intel_gpu_stats:
|
||||
continue
|
||||
|
||||
# intel VAAPI GPU
|
||||
intel_usage = get_intel_gpu_stats()
|
||||
|
||||
if intel_usage:
|
||||
stats["intel-vaapi"] = intel_usage
|
||||
else:
|
||||
stats["intel-vaapi"] = {"gpu": -1, "mem": -1}
|
||||
hwaccel_errors.append(args)
|
||||
elif "v4l2m2m" in args or "rpi" in args:
|
||||
# RPi v4l2m2m is currently not able to get usage stats
|
||||
stats["rpi-v4l2m2m"] = {"gpu": -1, "mem": -1}
|
||||
|
||||
if stats:
|
||||
all_stats["gpu_usages"] = stats
|
||||
|
||||
|
||||
def stats_snapshot(
|
||||
config: FrigateConfig, stats_tracking: StatsTrackingTypes, hwaccel_errors: list[str]
|
||||
) -> dict[str, Any]:
|
||||
"""Get a snapshot of the current stats that are being tracked."""
|
||||
camera_metrics = stats_tracking["camera_metrics"]
|
||||
stats: dict[str, Any] = {}
|
||||
|
||||
total_detection_fps = 0
|
||||
|
||||
stats["cameras"] = {}
|
||||
for name, camera_stats in camera_metrics.items():
|
||||
total_detection_fps += camera_stats["detection_fps"].value
|
||||
pid = camera_stats["process"].pid if camera_stats["process"] else None
|
||||
ffmpeg_pid = (
|
||||
camera_stats["ffmpeg_pid"].value if camera_stats["ffmpeg_pid"] else None
|
||||
)
|
||||
cpid = (
|
||||
camera_stats["capture_process"].pid
|
||||
if camera_stats["capture_process"]
|
||||
else None
|
||||
)
|
||||
stats["cameras"][name] = {
|
||||
"camera_fps": round(camera_stats["camera_fps"].value, 2),
|
||||
"process_fps": round(camera_stats["process_fps"].value, 2),
|
||||
"skipped_fps": round(camera_stats["skipped_fps"].value, 2),
|
||||
"detection_fps": round(camera_stats["detection_fps"].value, 2),
|
||||
"detection_enabled": config.cameras[name].detect.enabled,
|
||||
"pid": pid,
|
||||
"capture_pid": cpid,
|
||||
"ffmpeg_pid": ffmpeg_pid,
|
||||
"audio_rms": round(camera_stats["audio_rms"].value, 4),
|
||||
"audio_dBFS": round(camera_stats["audio_dBFS"].value, 4),
|
||||
}
|
||||
|
||||
stats["detectors"] = {}
|
||||
for name, detector in stats_tracking["detectors"].items():
|
||||
pid = detector.detect_process.pid if detector.detect_process else None
|
||||
stats["detectors"][name] = {
|
||||
"inference_speed": round(detector.avg_inference_speed.value * 1000, 2), # type: ignore[attr-defined]
|
||||
# issue https://github.com/python/typeshed/issues/8799
|
||||
# from mypy 0.981 onwards
|
||||
"detection_start": detector.detection_start.value, # type: ignore[attr-defined]
|
||||
# issue https://github.com/python/typeshed/issues/8799
|
||||
# from mypy 0.981 onwards
|
||||
"pid": pid,
|
||||
}
|
||||
stats["detection_fps"] = round(total_detection_fps, 2)
|
||||
|
||||
get_processing_stats(config, stats, hwaccel_errors)
|
||||
|
||||
stats["service"] = {
|
||||
"uptime": (int(time.time()) - stats_tracking["started"]),
|
||||
"version": VERSION,
|
||||
"latest_version": stats_tracking["latest_frigate_version"],
|
||||
"storage": {},
|
||||
"temperatures": get_temperatures(),
|
||||
"last_updated": int(time.time()),
|
||||
}
|
||||
|
||||
for path in [RECORD_DIR, CLIPS_DIR, CACHE_DIR, "/dev/shm"]:
|
||||
try:
|
||||
storage_stats = shutil.disk_usage(path)
|
||||
except FileNotFoundError:
|
||||
stats["service"]["storage"][path] = {}
|
||||
continue
|
||||
|
||||
stats["service"]["storage"][path] = {
|
||||
"total": round(storage_stats.total / pow(2, 20), 1),
|
||||
"used": round(storage_stats.used / pow(2, 20), 1),
|
||||
"free": round(storage_stats.free / pow(2, 20), 1),
|
||||
"mount_type": get_fs_type(path),
|
||||
}
|
||||
|
||||
stats["processes"] = {}
|
||||
for name, pid in stats_tracking["processes"].items():
|
||||
stats["processes"][name] = {
|
||||
"pid": pid,
|
||||
}
|
||||
|
||||
return stats
|
||||
Reference in New Issue
Block a user