Custom classes for Process and Metrics (#13950)

* Subclass Process for audio_process

* Introduce custom mp.Process subclass

In preparation to switch the multiprocessing startup method away from
"fork", we cannot rely on os.fork cloning the log state at fork time.
Instead, we have to set up logging before we run the business logic of
each process.

* Make camera_metrics into a class

* Make ptz_metrics into a class

* Fixed PtzMotionEstimator.ptz_metrics type annotation

* Removed pointless variables

* Do not start audio processor when no audio cameras are configured
This commit is contained in:
gtsiam
2024-09-27 15:53:23 +03:00
committed by GitHub
parent 1f328be1bd
commit c0bd3b362c
16 changed files with 471 additions and 448 deletions

View File

@@ -5,13 +5,9 @@ import os
import sys
import threading
from collections import deque
from contextlib import AbstractContextManager, ContextDecorator
from logging.handlers import QueueHandler, QueueListener
from types import TracebackType
from typing import Deque, Optional
from typing_extensions import Self
from frigate.util.builtin import clean_camera_user_pass
LOG_HANDLER = logging.StreamHandler()
@@ -28,45 +24,33 @@ LOG_HANDLER.addFilter(
)
)
log_listener: Optional[QueueListener] = None
class log_thread(AbstractContextManager, ContextDecorator):
def __init__(self, *, handler: logging.Handler = LOG_HANDLER):
super().__init__()
self._handler = handler
def setup_logging() -> None:
global log_listener
log_queue: mp.Queue = mp.Queue()
self._queue_handler = QueueHandler(log_queue)
log_queue: mp.Queue = mp.Queue()
log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True)
self._log_listener = QueueListener(
log_queue, self._handler, respect_handler_level=True
)
atexit.register(_stop_logging)
log_listener.start()
@property
def handler(self) -> logging.Handler:
return self._handler
logging.basicConfig(
level=logging.INFO,
handlers=[],
force=True,
)
def _stop_thread(self) -> None:
self._log_listener.stop()
logging.getLogger().addHandler(QueueHandler(log_listener.queue))
def __enter__(self) -> Self:
logging.getLogger().addHandler(self._queue_handler)
atexit.register(self._stop_thread)
self._log_listener.start()
def _stop_logging() -> None:
global log_listener
return self
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_info: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
logging.getLogger().removeHandler(self._queue_handler)
atexit.unregister(self._stop_thread)
self._stop_thread()
if log_listener is not None:
log_listener.stop()
log_listener = None
# When a multiprocessing.Process exits, python tries to flush stdout and stderr. However, if the