Improve face recognition (#15205)

* Validate faces using cosine distance and SVC

* Formatting

* Use opencv instead of face embedding

* Update docs for training data

* Adjust to score system

* Set bounds

* remove face embeddings

* Update writing images

* Add face library page

* Add ability to select file

* Install opencv deps

* Cleanup

* Use different deps

* Move deps

* Cleanup

* Only show face library for desktop

* Implement deleting

* Add ability to upload image

* Add support for uploading images
This commit is contained in:
Nicolas Mowen
2024-11-26 13:41:49 -07:00
parent 9d54beab76
commit 5cf018ca72
15 changed files with 397 additions and 137 deletions

View File

@@ -1,11 +1,14 @@
"""Object classification APIs."""
import logging
import os
from fastapi import APIRouter, Request, UploadFile
from fastapi.responses import JSONResponse
from pathvalidate import sanitize_filename
from frigate.api.defs.tags import Tags
from frigate.const import FACE_DIR
from frigate.embeddings import EmbeddingsContext
logger = logging.getLogger(__name__)
@@ -15,20 +18,18 @@ router = APIRouter(tags=[Tags.events])
@router.get("/faces")
def get_faces():
return JSONResponse(content={"message": "there are faces"})
face_dict: dict[str, list[str]] = {}
for name in os.listdir(FACE_DIR):
face_dict[name] = []
for file in os.listdir(os.path.join(FACE_DIR, name)):
face_dict[name].append(file)
return JSONResponse(status_code=200, content=face_dict)
@router.post("/faces/{name}")
async def register_face(request: Request, name: str, file: UploadFile):
# if not file.content_type.startswith("image"):
# return JSONResponse(
# status_code=400,
# content={
# "success": False,
# "message": "Only an image can be used to register a face.",
# },
# )
context: EmbeddingsContext = request.app.embeddings
context.register_face(name, await file.read())
return JSONResponse(
@@ -37,8 +38,8 @@ async def register_face(request: Request, name: str, file: UploadFile):
)
@router.delete("/faces")
def deregister_faces(request: Request, body: dict = None):
@router.post("/faces/{name}/delete")
def deregister_faces(request: Request, name: str, body: dict = None):
json: dict[str, any] = body or {}
list_of_ids = json.get("ids", "")
@@ -49,7 +50,9 @@ def deregister_faces(request: Request, body: dict = None):
)
context: EmbeddingsContext = request.app.embeddings
context.delete_face_ids(list_of_ids)
context.delete_face_ids(
name, map(lambda file: sanitize_filename(file), list_of_ids)
)
return JSONResponse(
content=({"success": True, "message": "Successfully deleted faces."}),
status_code=200,

View File

@@ -24,7 +24,10 @@ class SemanticSearchConfig(FrigateBaseModel):
class FaceRecognitionConfig(FrigateBaseModel):
enabled: bool = Field(default=False, title="Enable face recognition.")
threshold: float = Field(
default=0.9, title="Face similarity score required to be considered a match."
default=170,
title="minimum face distance score required to be considered a match.",
gt=0.0,
le=1.0,
)
min_area: int = Field(
default=500, title="Min area of face box to consider running face recognition."

View File

@@ -29,10 +29,6 @@ class SqliteVecQueueDatabase(SqliteQueueDatabase):
ids = ",".join(["?" for _ in event_ids])
self.execute_sql(f"DELETE FROM vec_descriptions WHERE id IN ({ids})", event_ids)
def delete_embeddings_face(self, face_ids: list[str]) -> None:
ids = ",".join(["?" for _ in face_ids])
self.execute_sql(f"DELETE FROM vec_faces WHERE id IN ({ids})", face_ids)
def drop_embeddings_tables(self) -> None:
self.execute_sql("""
DROP TABLE vec_descriptions;
@@ -40,11 +36,8 @@ class SqliteVecQueueDatabase(SqliteQueueDatabase):
self.execute_sql("""
DROP TABLE vec_thumbnails;
""")
self.execute_sql("""
DROP TABLE vec_faces;
""")
def create_embeddings_tables(self, face_recognition: bool) -> None:
def create_embeddings_tables(self) -> None:
"""Create vec0 virtual table for embeddings"""
self.execute_sql("""
CREATE VIRTUAL TABLE IF NOT EXISTS vec_thumbnails USING vec0(
@@ -58,11 +51,3 @@ class SqliteVecQueueDatabase(SqliteQueueDatabase):
description_embedding FLOAT[768] distance_metric=cosine
);
""")
if face_recognition:
self.execute_sql("""
CREATE VIRTUAL TABLE IF NOT EXISTS vec_faces USING vec0(
id TEXT PRIMARY KEY,
face_embedding FLOAT[512] distance_metric=cosine
);
""")

View File

@@ -14,7 +14,7 @@ from setproctitle import setproctitle
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor
from frigate.config import FrigateConfig
from frigate.const import CONFIG_DIR
from frigate.const import CONFIG_DIR, FACE_DIR
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.models import Event
from frigate.util.builtin import serialize
@@ -209,8 +209,13 @@ class EmbeddingsContext:
return self.db.execute_sql(sql_query).fetchall()
def delete_face_ids(self, ids: list[str]) -> None:
self.db.delete_embeddings_face(ids)
def delete_face_ids(self, face: str, ids: list[str]) -> None:
folder = os.path.join(FACE_DIR, face)
for id in ids:
file_path = os.path.join(folder, id)
if os.path.isfile(file_path):
os.unlink(file_path)
def update_description(self, event_id: str, description: str) -> None:
self.requestor.send_data(

View File

@@ -3,8 +3,6 @@
import base64
import logging
import os
import random
import string
import time
from numpy import ndarray
@@ -14,7 +12,6 @@ from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import (
CONFIG_DIR,
FACE_DIR,
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
UPDATE_MODEL_STATE,
)
@@ -68,7 +65,7 @@ class Embeddings:
self.requestor = InterProcessRequestor()
# Create tables if they don't exist
self.db.create_embeddings_tables(self.config.face_recognition.enabled)
self.db.create_embeddings_tables()
models = [
"jinaai/jina-clip-v1-text_model_fp16.onnx",
@@ -126,22 +123,6 @@ class Embeddings:
device="GPU" if config.semantic_search.model_size == "large" else "CPU",
)
self.face_embedding = None
if self.config.face_recognition.enabled:
self.face_embedding = GenericONNXEmbedding(
model_name="facenet",
model_file="facenet.onnx",
download_urls={
"facenet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facenet.onnx",
"facedet.onnx": "https://github.com/opencv/opencv_zoo/raw/refs/heads/main/models/face_detection_yunet/face_detection_yunet_2023mar_int8.onnx",
},
model_size="large",
model_type=ModelTypeEnum.face,
requestor=self.requestor,
device="GPU",
)
self.lpr_detection_model = None
self.lpr_classification_model = None
self.lpr_recognition_model = None
@@ -277,40 +258,12 @@ class Embeddings:
return embeddings
def embed_face(self, label: str, thumbnail: bytes, upsert: bool = False) -> ndarray:
embedding = self.face_embedding(thumbnail)[0]
if upsert:
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
id = f"{label}-{rand_id}"
# write face to library
folder = os.path.join(FACE_DIR, label)
file = os.path.join(folder, f"{id}.webp")
os.makedirs(folder, exist_ok=True)
# save face image
with open(file, "wb") as output:
output.write(thumbnail)
self.db.execute_sql(
"""
INSERT OR REPLACE INTO vec_faces(id, face_embedding)
VALUES(?, ?)
""",
(id, serialize(embedding)),
)
return embedding
def reindex(self) -> None:
logger.info("Indexing tracked object embeddings...")
self.db.drop_embeddings_tables()
logger.debug("Dropped embeddings tables.")
self.db.create_embeddings_tables(self.config.face_recognition.enabled)
self.db.create_embeddings_tables()
logger.debug("Created embeddings tables.")
# Delete the saved stats file

View File

@@ -3,7 +3,9 @@
import base64
import logging
import os
import random
import re
import string
import threading
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
@@ -23,7 +25,12 @@ from frigate.comms.event_metadata_updater import (
from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import CLIPS_DIR, FRIGATE_LOCALHOST, UPDATE_EVENT_DESCRIPTION
from frigate.const import (
CLIPS_DIR,
FACE_DIR,
FRIGATE_LOCALHOST,
UPDATE_EVENT_DESCRIPTION,
)
from frigate.embeddings.lpr.lpr import LicensePlateRecognition
from frigate.events.types import EventTypeEnum
from frigate.genai import get_genai_client
@@ -70,7 +77,9 @@ class EmbeddingMaintainer(threading.Thread):
self.requires_face_detection = "face" not in self.config.objects.all_objects
self.detected_faces: dict[str, float] = {}
self.face_classifier = (
FaceClassificationModel(db) if self.face_recognition_enabled else None
FaceClassificationModel(self.config.face_recognition, db)
if self.face_recognition_enabled
else None
)
# create communication for updating event descriptions
@@ -145,12 +154,14 @@ class EmbeddingMaintainer(threading.Thread):
if not self.face_recognition_enabled:
return False
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
label = data["face_name"]
id = f"{label}-{rand_id}"
if data.get("cropped"):
self.embeddings.embed_face(
data["face_name"],
base64.b64decode(data["image"]),
upsert=True,
)
pass
else:
img = cv2.imdecode(
np.frombuffer(
@@ -164,12 +175,18 @@ class EmbeddingMaintainer(threading.Thread):
return False
face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]]
ret, webp = cv2.imencode(
ret, thumbnail = cv2.imencode(
".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
)
self.embeddings.embed_face(
data["face_name"], webp.tobytes(), upsert=True
)
# write face to library
folder = os.path.join(FACE_DIR, label)
file = os.path.join(folder, f"{id}.webp")
os.makedirs(folder, exist_ok=True)
# save face image
with open(file, "wb") as output:
output.write(thumbnail.tobytes())
self.face_classifier.clear_classifier()
return True
@@ -202,7 +219,9 @@ class EmbeddingMaintainer(threading.Thread):
# Create our own thumbnail based on the bounding box and the frame time
try:
yuv_frame = self.frame_manager.get(frame_name, camera_config.frame_shape_yuv)
yuv_frame = self.frame_manager.get(
frame_name, camera_config.frame_shape_yuv
)
except FileNotFoundError:
pass
@@ -479,16 +498,7 @@ class EmbeddingMaintainer(threading.Thread):
),
]
ret, webp = cv2.imencode(
".webp", face_frame, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
)
if not ret:
logger.debug("Not processing face due to error creating cropped image.")
return
embedding = self.embeddings.embed_face("unknown", webp.tobytes(), upsert=False)
res = self.face_classifier.classify_face(embedding)
res = self.face_classifier.classify_face(face_frame)
if not res:
return
@@ -499,11 +509,9 @@ class EmbeddingMaintainer(threading.Thread):
f"Detected best face for person as: {sub_label} with score {score}"
)
if score < self.config.face_recognition.threshold or (
id in self.detected_faces and score <= self.detected_faces[id]
):
if id in self.detected_faces and score <= self.detected_faces[id]:
logger.debug(
f"Recognized face score {score} is less than threshold ({self.config.face_recognition.threshold}) / previous face score ({self.detected_faces.get(id)})."
f"Recognized face distance {score} is less than previous face distance ({self.detected_faces.get(id)})."
)
return

View File

@@ -4,13 +4,12 @@ import logging
import os
from typing import Any, Optional
import cv2
import numpy as np
import onnxruntime as ort
from playhouse.sqliteq import SqliteQueueDatabase
from sklearn.preprocessing import LabelEncoder, Normalizer
from sklearn.svm import SVC
from frigate.util.builtin import deserialize
from frigate.config.semantic_search import FaceRecognitionConfig
try:
import openvino as ov
@@ -21,6 +20,9 @@ except ImportError:
logger = logging.getLogger(__name__)
MIN_MATCHING_FACES = 2
def get_ort_providers(
force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False
) -> tuple[list[str], list[dict[str, any]]]:
@@ -157,38 +159,42 @@ class ONNXModelRunner:
class FaceClassificationModel:
def __init__(self, db: SqliteQueueDatabase):
def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase):
self.config = config
self.db = db
self.labeler: Optional[LabelEncoder] = None
self.classifier: Optional[SVC] = None
self.recognizer = cv2.face.LBPHFaceRecognizer_create(radius=4, threshold=(1 - config.threshold) * 1000)
self.label_map: dict[int, str] = {}
def __build_classifier(self) -> None:
faces: list[tuple[str, bytes]] = self.db.execute_sql(
"SELECT id, face_embedding FROM vec_faces"
).fetchall()
embeddings = np.array([deserialize(f[1]) for f in faces])
self.labeler = LabelEncoder()
norms = Normalizer(norm="l2").transform(embeddings)
labels = self.labeler.fit_transform([f[0].split("-")[0] for f in faces])
self.classifier = SVC(kernel="linear", probability=True)
self.classifier.fit(norms, labels)
labels = []
faces = []
dir = "/media/frigate/clips/faces"
for idx, name in enumerate(os.listdir(dir)):
self.label_map[idx] = name
face_folder = os.path.join(dir, name)
for image in os.listdir(face_folder):
img = cv2.imread(os.path.join(face_folder, image))
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
equ = cv2.equalizeHist(gray)
faces.append(equ)
labels.append(idx)
self.recognizer.train(faces, np.array(labels))
def clear_classifier(self) -> None:
self.classifier = None
self.labeler = None
def classify_face(self, embedding: np.ndarray) -> Optional[tuple[str, float]]:
if not self.classifier:
def classify_face(self, face_image: np.ndarray) -> Optional[tuple[str, float]]:
if not self.label_map:
self.__build_classifier()
res = self.classifier.predict([embedding])
index, distance = self.recognizer.predict(cv2.equalizeHist(cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)))
if res is None:
if index == -1:
return None
label = res[0]
probabilities = self.classifier.predict_proba([embedding])[0]
return (
self.labeler.inverse_transform([label])[0],
round(probabilities[label], 2),
)
score = 1.0 - (distance / 1000)
return self.label_map[index], round(score, 2)