Face recognition backend (#14495)

* Add basic config and face recognition table

* Reconfigure updates processing to handle face

* Crop frame to face box

* Implement face embedding calculation

* Get matching face embeddings

* Add support face recognition based on existing faces

* Use arcface face embeddings instead of generic embeddings model

* Add apis for managing faces

* Implement face uploading API

* Build out more APIs

* Add min area config

* Handle larger images

* Add more debug logs

* fix calculation

* Reduce timeout

* Small tweaks

* Use webp images

* Use facenet model
This commit is contained in:
Nicolas Mowen
2024-10-22 16:05:48 -06:00
parent f16f6d3789
commit ca5711d1ab
13 changed files with 365 additions and 45 deletions

View File

@@ -10,6 +10,7 @@ from typing import Optional
import cv2
import numpy as np
import requests
from peewee import DoesNotExist
from playhouse.sqliteq import SqliteQueueDatabase
@@ -21,13 +22,13 @@ from frigate.comms.event_metadata_updater import (
from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import CLIPS_DIR, UPDATE_EVENT_DESCRIPTION
from frigate.const import CLIPS_DIR, FRIGATE_LOCALHOST, UPDATE_EVENT_DESCRIPTION
from frigate.events.types import EventTypeEnum
from frigate.genai import get_genai_client
from frigate.models import Event
from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import serialize
from frigate.util.image import SharedMemoryFrameManager, calculate_region
from frigate.util.image import SharedMemoryFrameManager, area, calculate_region
from .embeddings import Embeddings
@@ -60,10 +61,17 @@ class EmbeddingMaintainer(threading.Thread):
)
self.embeddings_responder = EmbeddingsResponder()
self.frame_manager = SharedMemoryFrameManager()
# set face recognition conditions
self.face_recognition_enabled = (
self.config.semantic_search.face_recognition.enabled
)
self.requires_face_detection = "face" not in self.config.model.all_attributes
# create communication for updating event descriptions
self.requestor = InterProcessRequestor()
self.stop_event = stop_event
self.tracked_events = {}
self.tracked_events: dict[str, list[any]] = {}
self.genai_client = get_genai_client(config)
def run(self) -> None:
@@ -103,6 +111,13 @@ class EmbeddingMaintainer(threading.Thread):
return serialize(
self.embeddings.text_embedding([data])[0], pack=False
)
elif topic == EmbeddingsRequestEnum.register_face.value:
self.embeddings.embed_face(
data["face_name"],
base64.b64decode(data["image"]),
upsert=True,
)
return None
except Exception as e:
logger.error(f"Unable to handle embeddings request {e}")
@@ -110,7 +125,7 @@ class EmbeddingMaintainer(threading.Thread):
def _process_updates(self) -> None:
"""Process event updates"""
update = self.event_subscriber.check_for_update(timeout=0.1)
update = self.event_subscriber.check_for_update(timeout=0.01)
if update is None:
return
@@ -121,42 +136,47 @@ class EmbeddingMaintainer(threading.Thread):
return
camera_config = self.config.cameras[camera]
# no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary
if (
not camera_config.genai.enabled
or self.genai_client is None
or data["stationary"]
):
return
if data["id"] not in self.tracked_events:
self.tracked_events[data["id"]] = []
# no need to process updated objects if face recognition and genai are disabled
if not camera_config.genai.enabled and not self.face_recognition_enabled:
return
# Create our own thumbnail based on the bounding box and the frame time
try:
yuv_frame = self.frame_manager.get(
frame_name, camera_config.frame_shape_yuv
)
if yuv_frame is not None:
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"])
# Limit the number of thumbnails saved
if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS:
# Always keep the first thumbnail for the event
self.tracked_events[data["id"]].pop(1)
self.tracked_events[data["id"]].append(data)
self.frame_manager.close(frame_name)
yuv_frame = self.frame_manager.get(frame_name, camera_config.frame_shape_yuv)
except FileNotFoundError:
pass
if yuv_frame is None:
logger.debug(
"Unable to process object update because frame is unavailable."
)
return
if self.face_recognition_enabled:
self._process_face(data, yuv_frame)
# no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary
if self.genai_client is not None and not data["stationary"]:
if data["id"] not in self.tracked_events:
self.tracked_events[data["id"]] = []
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"])
# Limit the number of thumbnails saved
if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS:
# Always keep the first thumbnail for the event
self.tracked_events[data["id"]].pop(1)
self.tracked_events[data["id"]].append(data)
self.frame_manager.close(frame_name)
def _process_finalized(self) -> None:
"""Process the end of an event."""
while True:
ended = self.event_end_subscriber.check_for_update(timeout=0.1)
ended = self.event_end_subscriber.check_for_update(timeout=0.01)
if ended == None:
break
@@ -277,7 +297,7 @@ class EmbeddingMaintainer(threading.Thread):
def _process_event_metadata(self):
# Check for regenerate description requests
(topic, event_id, source) = self.event_metadata_subscriber.check_for_update(
timeout=0.1
timeout=0.01
)
if topic is None:
@@ -286,6 +306,94 @@ class EmbeddingMaintainer(threading.Thread):
if event_id:
self.handle_regenerate_description(event_id, source)
def _search_face(self, query_embedding: bytes) -> list:
"""Search for the face most closely matching the embedding."""
sql_query = """
SELECT
id,
distance
FROM vec_faces
WHERE face_embedding MATCH ?
AND k = 10 ORDER BY distance
"""
return self.embeddings.db.execute_sql(sql_query, [query_embedding]).fetchall()
def _process_face(self, obj_data: dict[str, any], frame: np.ndarray) -> None:
"""Look for faces in image."""
# don't run for non person objects
if obj_data.get("label") != "person":
logger.debug("Not a processing face for non person object.")
return
# don't overwrite sub label for objects that have one
if obj_data.get("sub_label"):
logger.debug(
f"Not processing face due to existing sub label: {obj_data.get('sub_label')}."
)
return
face: Optional[dict[str, any]] = None
if self.requires_face_detection:
# TODO run cv2 face detection
pass
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "face":
continue
if face is None or attr.get("score", 0.0) > face.get("score", 0.0):
face = attr
# no faces detected in this frame
if not face:
return
face_box = face.get("box")
# check that face is valid
if (
not face_box
or area(face_box) < self.config.semantic_search.face_recognition.min_area
):
logger.debug(f"Invalid face box {face}")
return
face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
face_frame = face_frame[face_box[1] : face_box[3], face_box[0] : face_box[2]]
ret, jpg = cv2.imencode(
".webp", face_frame, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
)
if not ret:
logger.debug("Not processing face due to error creating cropped image.")
return
embedding = self.embeddings.embed_face("unknown", jpg.tobytes(), upsert=False)
query_embedding = serialize(embedding)
best_faces = self._search_face(query_embedding)
logger.debug(f"Detected best faces for person as: {best_faces}")
if not best_faces:
return
sub_label = str(best_faces[0][0]).split("-")[0]
score = 1.0 - best_faces[0][1]
if score < self.config.semantic_search.face_recognition.threshold:
return None
requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{obj_data['id']}/sub_label",
json={"subLabel": sub_label, "subLabelScore": score},
)
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame."""
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)