forked from Github/frigate
Convert detectors to factory pattern, ability to set different model for each detector (#4635)
* refactor detectors * move create_detector and DetectorTypeEnum * fixed code formatting * add detector model config models * fix detector unit tests * adjust SharedMemory size to largest detector model shape * fix detector model config defaults * enable auto-discovery of detectors * simplify config * simplify config changes further * update detectors docs; detect detector configs dynamic * add suggested changes * remove custom detector doc * fix grammar, adjust device defaults
This commit is contained in:
@@ -0,0 +1,24 @@
|
||||
import logging
|
||||
|
||||
from .detection_api import DetectionApi
|
||||
from .detector_config import (
|
||||
PixelFormatEnum,
|
||||
InputTensorEnum,
|
||||
ModelConfig,
|
||||
)
|
||||
from .detector_types import DetectorTypeEnum, api_types, DetectorConfig
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_detector(detector_config):
|
||||
if detector_config.type == DetectorTypeEnum.cpu:
|
||||
logger.warning(
|
||||
"CPU detectors are not recommended and should only be used for testing or for trial purposes."
|
||||
)
|
||||
|
||||
api = api_types.get(detector_config.type)
|
||||
if not api:
|
||||
raise ValueError(detector_config.type)
|
||||
return api(detector_config)
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import logging
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Dict
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DetectionApi(ABC):
|
||||
type_key: str
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, det_device=None, model_config=None):
|
||||
def __init__(self, detector_config):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
|
||||
78
frigate/detectors/detector_config.py
Normal file
78
frigate/detectors/detector_config.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import logging
|
||||
from enum import Enum
|
||||
from typing import Dict, List, Optional, Tuple, Union, Literal
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
from pydantic import BaseModel, Extra, Field, validator
|
||||
from pydantic.fields import PrivateAttr
|
||||
|
||||
from frigate.util import load_labels
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PixelFormatEnum(str, Enum):
|
||||
rgb = "rgb"
|
||||
bgr = "bgr"
|
||||
yuv = "yuv"
|
||||
|
||||
|
||||
class InputTensorEnum(str, Enum):
|
||||
nchw = "nchw"
|
||||
nhwc = "nhwc"
|
||||
|
||||
|
||||
class ModelConfig(BaseModel):
|
||||
path: Optional[str] = Field(title="Custom Object detection model path.")
|
||||
labelmap_path: Optional[str] = Field(title="Label map for custom object detector.")
|
||||
width: int = Field(default=320, title="Object detection model input width.")
|
||||
height: int = Field(default=320, title="Object detection model input height.")
|
||||
labelmap: Dict[int, str] = Field(
|
||||
default_factory=dict, title="Labelmap customization."
|
||||
)
|
||||
input_tensor: InputTensorEnum = Field(
|
||||
default=InputTensorEnum.nhwc, title="Model Input Tensor Shape"
|
||||
)
|
||||
input_pixel_format: PixelFormatEnum = Field(
|
||||
default=PixelFormatEnum.rgb, title="Model Input Pixel Color Format"
|
||||
)
|
||||
_merged_labelmap: Optional[Dict[int, str]] = PrivateAttr()
|
||||
_colormap: Dict[int, Tuple[int, int, int]] = PrivateAttr()
|
||||
|
||||
@property
|
||||
def merged_labelmap(self) -> Dict[int, str]:
|
||||
return self._merged_labelmap
|
||||
|
||||
@property
|
||||
def colormap(self) -> Dict[int, Tuple[int, int, int]]:
|
||||
return self._colormap
|
||||
|
||||
def __init__(self, **config):
|
||||
super().__init__(**config)
|
||||
|
||||
self._merged_labelmap = {
|
||||
**load_labels(config.get("labelmap_path", "/labelmap.txt")),
|
||||
**config.get("labelmap", {}),
|
||||
}
|
||||
|
||||
cmap = plt.cm.get_cmap("tab10", len(self._merged_labelmap.keys()))
|
||||
|
||||
self._colormap = {}
|
||||
for key, val in self._merged_labelmap.items():
|
||||
self._colormap[val] = tuple(int(round(255 * c)) for c in cmap(key)[:3])
|
||||
|
||||
class Config:
|
||||
extra = Extra.forbid
|
||||
|
||||
|
||||
class BaseDetectorConfig(BaseModel):
|
||||
# the type field must be defined in all subclasses
|
||||
type: str = Field(default="cpu", title="Detector Type")
|
||||
model: ModelConfig = Field(
|
||||
default=None, title="Detector specific model configuration."
|
||||
)
|
||||
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
arbitrary_types_allowed = True
|
||||
35
frigate/detectors/detector_types.py
Normal file
35
frigate/detectors/detector_types.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import logging
|
||||
import importlib
|
||||
import pkgutil
|
||||
from typing import Union
|
||||
from typing_extensions import Annotated
|
||||
from enum import Enum
|
||||
from pydantic import Field
|
||||
|
||||
from . import plugins
|
||||
from .detection_api import DetectionApi
|
||||
from .detector_config import BaseDetectorConfig
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
plugin_modules = [
|
||||
importlib.import_module(name)
|
||||
for finder, name, ispkg in pkgutil.iter_modules(
|
||||
plugins.__path__, plugins.__name__ + "."
|
||||
)
|
||||
]
|
||||
|
||||
api_types = {det.type_key: det for det in DetectionApi.__subclasses__()}
|
||||
|
||||
|
||||
class StrEnum(str, Enum):
|
||||
pass
|
||||
|
||||
|
||||
DetectorTypeEnum = StrEnum("DetectorTypeEnum", {k: k for k in api_types})
|
||||
|
||||
DetectorConfig = Annotated[
|
||||
Union[tuple(BaseDetectorConfig.__subclasses__())],
|
||||
Field(discriminator="type"),
|
||||
]
|
||||
0
frigate/detectors/plugins/__init__.py
Normal file
0
frigate/detectors/plugins/__init__.py
Normal file
@@ -2,15 +2,29 @@ import logging
|
||||
import numpy as np
|
||||
|
||||
from frigate.detectors.detection_api import DetectionApi
|
||||
from frigate.detectors.detector_config import BaseDetectorConfig
|
||||
from typing import Literal
|
||||
from pydantic import Extra, Field
|
||||
import tflite_runtime.interpreter as tflite
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DETECTOR_KEY = "cpu"
|
||||
|
||||
|
||||
class CpuDetectorConfig(BaseDetectorConfig):
|
||||
type: Literal[DETECTOR_KEY]
|
||||
num_threads: int = Field(default=3, title="Number of detection threads")
|
||||
|
||||
|
||||
class CpuTfl(DetectionApi):
|
||||
def __init__(self, det_device=None, model_config=None, num_threads=3):
|
||||
type_key = DETECTOR_KEY
|
||||
|
||||
def __init__(self, detector_config: CpuDetectorConfig):
|
||||
self.interpreter = tflite.Interpreter(
|
||||
model_path=model_config.path or "/cpu_model.tflite", num_threads=num_threads
|
||||
model_path=detector_config.model.path or "/cpu_model.tflite",
|
||||
num_threads=detector_config.num_threads or 3,
|
||||
)
|
||||
|
||||
self.interpreter.allocate_tensors()
|
||||
@@ -2,17 +2,30 @@ import logging
|
||||
import numpy as np
|
||||
|
||||
from frigate.detectors.detection_api import DetectionApi
|
||||
from frigate.detectors.detector_config import BaseDetectorConfig
|
||||
from typing import Literal
|
||||
from pydantic import Extra, Field
|
||||
import tflite_runtime.interpreter as tflite
|
||||
from tflite_runtime.interpreter import load_delegate
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DETECTOR_KEY = "edgetpu"
|
||||
|
||||
|
||||
class EdgeTpuDetectorConfig(BaseDetectorConfig):
|
||||
type: Literal[DETECTOR_KEY]
|
||||
device: str = Field(default=None, title="Device Type")
|
||||
|
||||
|
||||
class EdgeTpuTfl(DetectionApi):
|
||||
def __init__(self, det_device=None, model_config=None):
|
||||
type_key = DETECTOR_KEY
|
||||
|
||||
def __init__(self, detector_config: EdgeTpuDetectorConfig):
|
||||
device_config = {"device": "usb"}
|
||||
if not det_device is None:
|
||||
device_config = {"device": det_device}
|
||||
if detector_config.device is not None:
|
||||
device_config = {"device": detector_config.device}
|
||||
|
||||
edge_tpu_delegate = None
|
||||
|
||||
@@ -21,7 +34,7 @@ class EdgeTpuTfl(DetectionApi):
|
||||
edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config)
|
||||
logger.info("TPU found")
|
||||
self.interpreter = tflite.Interpreter(
|
||||
model_path=model_config.path or "/edgetpu_model.tflite",
|
||||
model_path=detector_config.model.path or "/edgetpu_model.tflite",
|
||||
experimental_delegates=[edge_tpu_delegate],
|
||||
)
|
||||
except ValueError:
|
||||
@@ -3,18 +3,30 @@ import numpy as np
|
||||
import openvino.runtime as ov
|
||||
|
||||
from frigate.detectors.detection_api import DetectionApi
|
||||
from frigate.detectors.detector_config import BaseDetectorConfig
|
||||
from typing import Literal
|
||||
from pydantic import Extra, Field
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DETECTOR_KEY = "openvino"
|
||||
|
||||
|
||||
class OvDetectorConfig(BaseDetectorConfig):
|
||||
type: Literal[DETECTOR_KEY]
|
||||
device: str = Field(default=None, title="Device Type")
|
||||
|
||||
|
||||
class OvDetector(DetectionApi):
|
||||
def __init__(self, det_device=None, model_config=None, num_threads=1):
|
||||
type_key = DETECTOR_KEY
|
||||
|
||||
def __init__(self, detector_config: OvDetectorConfig):
|
||||
self.ov_core = ov.Core()
|
||||
self.ov_model = self.ov_core.read_model(model_config.path)
|
||||
self.ov_model = self.ov_core.read_model(detector_config.model.path)
|
||||
|
||||
self.interpreter = self.ov_core.compile_model(
|
||||
model=self.ov_model, device_name=det_device
|
||||
model=self.ov_model, device_name=detector_config.device
|
||||
)
|
||||
logger.info(f"Model Input Shape: {self.interpreter.input(0).shape}")
|
||||
self.output_indexes = 0
|
||||
Reference in New Issue
Block a user