forked from Github/frigate
License plate recognition (ALPR) backend (#14564)
* Update version * Face recognition backend (#14495) * Add basic config and face recognition table * Reconfigure updates processing to handle face * Crop frame to face box * Implement face embedding calculation * Get matching face embeddings * Add support face recognition based on existing faces * Use arcface face embeddings instead of generic embeddings model * Add apis for managing faces * Implement face uploading API * Build out more APIs * Add min area config * Handle larger images * Add more debug logs * fix calculation * Reduce timeout * Small tweaks * Use webp images * Use facenet model * Improve face recognition (#14537) * Increase requirements for face to be set * Manage faces properly * Add basic docs * Simplify * Separate out face recognition frome semantic search * Update docs * Formatting * Fix access (#14540) * Face detection (#14544) * Add support for face detection * Add support for detecting faces during registration * Set body size to be larger * Undo * Update version * Face recognition backend (#14495) * Add basic config and face recognition table * Reconfigure updates processing to handle face * Crop frame to face box * Implement face embedding calculation * Get matching face embeddings * Add support face recognition based on existing faces * Use arcface face embeddings instead of generic embeddings model * Add apis for managing faces * Implement face uploading API * Build out more APIs * Add min area config * Handle larger images * Add more debug logs * fix calculation * Reduce timeout * Small tweaks * Use webp images * Use facenet model * Improve face recognition (#14537) * Increase requirements for face to be set * Manage faces properly * Add basic docs * Simplify * Separate out face recognition frome semantic search * Update docs * Formatting * Fix access (#14540) * Face detection (#14544) * Add support for face detection * Add support for detecting faces during registration * Set body size to be larger * Undo * initial foundation for alpr with paddleocr * initial foundation for alpr with paddleocr * initial foundation for alpr with paddleocr * config * config * lpr maintainer * clean up * clean up * fix processing * don't process for stationary cars * fix order * fixes * check for known plates * improved length and character by character confidence * model fixes and small tweaks * docs * placeholder for non frigate+ model lp detection --------- Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
This commit is contained in:
committed by
Nicolas Mowen
parent
8dc9c2c9ed
commit
3ed4fb87ef
@@ -23,6 +23,7 @@ from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscrib
|
||||
from frigate.comms.inter_process import InterProcessRequestor
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.const import CLIPS_DIR, FRIGATE_LOCALHOST, UPDATE_EVENT_DESCRIPTION
|
||||
from frigate.embeddings.alpr.alpr import LicensePlateRecognition
|
||||
from frigate.events.types import EventTypeEnum
|
||||
from frigate.genai import get_genai_client
|
||||
from frigate.models import Event
|
||||
@@ -74,6 +75,18 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
self.tracked_events: dict[str, list[any]] = {}
|
||||
self.genai_client = get_genai_client(config)
|
||||
|
||||
# set license plate recognition conditions
|
||||
self.lpr_config = self.config.lpr
|
||||
self.requires_license_plate_detection = (
|
||||
"license_plate" not in self.config.model.all_attributes
|
||||
)
|
||||
self.detected_license_plates: dict[str, dict[str, any]] = {}
|
||||
|
||||
if self.lpr_config.enabled:
|
||||
self.license_plate_recognition = LicensePlateRecognition(
|
||||
self.lpr_config, self.requestor, self.embeddings
|
||||
)
|
||||
|
||||
@property
|
||||
def face_detector(self) -> cv2.FaceDetectorYN:
|
||||
# Lazily create the classifier.
|
||||
@@ -172,8 +185,12 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
|
||||
camera_config = self.config.cameras[camera]
|
||||
|
||||
# no need to process updated objects if face recognition and genai are disabled
|
||||
if not camera_config.genai.enabled and not self.face_recognition_enabled:
|
||||
# no need to process updated objects if face recognition, lpr, genai are disabled
|
||||
if (
|
||||
not camera_config.genai.enabled
|
||||
and not self.face_recognition_enabled
|
||||
and not self.lpr_config.enabled
|
||||
):
|
||||
return
|
||||
|
||||
# Create our own thumbnail based on the bounding box and the frame time
|
||||
@@ -191,6 +208,9 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
if self.face_recognition_enabled:
|
||||
self._process_face(data, yuv_frame)
|
||||
|
||||
if self.lpr_config.enabled:
|
||||
self._process_license_plate(data, yuv_frame)
|
||||
|
||||
# no need to save our own thumbnails if genai is not enabled
|
||||
# or if the object has become stationary
|
||||
if self.genai_client is not None and not data["stationary"]:
|
||||
@@ -222,6 +242,9 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
if event_id in self.detected_faces:
|
||||
self.detected_faces.pop(event_id)
|
||||
|
||||
if event_id in self.detected_license_plates:
|
||||
self.detected_license_plates.pop(event_id)
|
||||
|
||||
if updated_db:
|
||||
try:
|
||||
event: Event = Event.get(Event.id == event_id)
|
||||
@@ -497,6 +520,181 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
if resp.status_code == 200:
|
||||
self.detected_faces[id] = avg_score
|
||||
|
||||
def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
|
||||
"""Return the dimensions of the input image as [x, y, width, height]."""
|
||||
height, width = input.shape[:2]
|
||||
return (0, 0, width, height)
|
||||
|
||||
def _process_license_plate(
|
||||
self, obj_data: dict[str, any], frame: np.ndarray
|
||||
) -> None:
|
||||
"""Look for license plates in image."""
|
||||
id = obj_data["id"]
|
||||
|
||||
# don't run for non car objects
|
||||
if obj_data.get("label") != "car":
|
||||
logger.debug("Not a processing license plate for non car object.")
|
||||
return
|
||||
|
||||
# don't run for stationary car objects
|
||||
if obj_data.get("stationary") == True:
|
||||
logger.debug("Not a processing license plate for a stationary car object.")
|
||||
return
|
||||
|
||||
# don't overwrite sub label for objects that have a sub label
|
||||
# that is not a license plate
|
||||
if obj_data.get("sub_label") and id not in self.detected_license_plates:
|
||||
logger.debug(
|
||||
f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}."
|
||||
)
|
||||
return
|
||||
|
||||
license_plate: Optional[dict[str, any]] = None
|
||||
|
||||
if self.requires_license_plate_detection:
|
||||
logger.debug("Running manual license_plate detection.")
|
||||
car_box = obj_data.get("box")
|
||||
|
||||
if not car_box:
|
||||
return None
|
||||
|
||||
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
|
||||
left, top, right, bottom = car_box
|
||||
car = rgb[top:bottom, left:right]
|
||||
license_plate = self._detect_license_plate(car)
|
||||
|
||||
if not license_plate:
|
||||
logger.debug("Detected no license plates for car object.")
|
||||
return
|
||||
|
||||
license_plate_frame = car[
|
||||
license_plate[1] : license_plate[3], license_plate[0] : license_plate[2]
|
||||
]
|
||||
license_plate_frame = cv2.cvtColor(license_plate_frame, cv2.COLOR_RGB2BGR)
|
||||
else:
|
||||
# don't run for object without attributes
|
||||
if not obj_data.get("current_attributes"):
|
||||
logger.debug("No attributes to parse.")
|
||||
return
|
||||
|
||||
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
|
||||
for attr in attributes:
|
||||
if attr.get("label") != "license_plate":
|
||||
continue
|
||||
|
||||
if license_plate is None or attr.get("score", 0.0) > license_plate.get(
|
||||
"score", 0.0
|
||||
):
|
||||
license_plate = attr
|
||||
|
||||
# no license plates detected in this frame
|
||||
if not license_plate:
|
||||
return
|
||||
|
||||
license_plate_box = license_plate.get("box")
|
||||
|
||||
# check that license plate is valid
|
||||
if (
|
||||
not license_plate_box
|
||||
or area(license_plate_box) < self.config.lpr.min_area
|
||||
):
|
||||
logger.debug(f"Invalid license plate box {license_plate}")
|
||||
return
|
||||
|
||||
license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
|
||||
license_plate_frame = license_plate_frame[
|
||||
license_plate_box[1] : license_plate_box[3],
|
||||
license_plate_box[0] : license_plate_box[2],
|
||||
]
|
||||
|
||||
# run detection, returns results sorted by confidence, best first
|
||||
license_plates, confidences, areas = (
|
||||
self.license_plate_recognition.process_license_plate(license_plate_frame)
|
||||
)
|
||||
|
||||
logger.debug(f"Text boxes: {license_plates}")
|
||||
logger.debug(f"Confidences: {confidences}")
|
||||
logger.debug(f"Areas: {areas}")
|
||||
|
||||
if license_plates:
|
||||
for plate, confidence, text_area in zip(license_plates, confidences, areas):
|
||||
avg_confidence = (
|
||||
(sum(confidence) / len(confidence)) if confidence else 0
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)"
|
||||
)
|
||||
else:
|
||||
# no plates found
|
||||
logger.debug("No text detected")
|
||||
return
|
||||
|
||||
top_plate, top_char_confidences = license_plates[0], confidences[0]
|
||||
avg_confidence = sum(top_char_confidences) / len(top_char_confidences)
|
||||
|
||||
# Check if we have a previously detected plate for this ID
|
||||
if id in self.detected_license_plates:
|
||||
prev_plate = self.detected_license_plates[id]["plate"]
|
||||
prev_char_confidences = self.detected_license_plates[id]["char_confidences"]
|
||||
prev_avg_confidence = sum(prev_char_confidences) / len(
|
||||
prev_char_confidences
|
||||
)
|
||||
|
||||
# Define conditions for keeping the previous plate
|
||||
shorter_than_previous = len(top_plate) < len(prev_plate)
|
||||
lower_avg_confidence = avg_confidence <= prev_avg_confidence
|
||||
|
||||
# Compare character-by-character confidence where possible
|
||||
min_length = min(len(top_plate), len(prev_plate))
|
||||
char_confidence_comparison = sum(
|
||||
1
|
||||
for i in range(min_length)
|
||||
if top_char_confidences[i] <= prev_char_confidences[i]
|
||||
)
|
||||
worse_char_confidences = char_confidence_comparison >= min_length / 2
|
||||
|
||||
if shorter_than_previous or (
|
||||
lower_avg_confidence and worse_char_confidences
|
||||
):
|
||||
logger.debug(
|
||||
f"Keeping previous plate. New plate stats: "
|
||||
f"length={len(top_plate)}, avg_conf={avg_confidence:.2f} "
|
||||
f"vs Previous: length={len(prev_plate)}, avg_conf={prev_avg_confidence:.2f}"
|
||||
)
|
||||
return
|
||||
|
||||
# Check against minimum confidence threshold
|
||||
if avg_confidence < self.lpr_config.threshold:
|
||||
logger.debug(
|
||||
f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.threshold})"
|
||||
)
|
||||
return
|
||||
|
||||
# Determine subLabel based on known plates
|
||||
# Default to the detected plate, use label name if there's a match
|
||||
sub_label = top_plate
|
||||
for label, plates in self.lpr_config.known_plates.items():
|
||||
if top_plate in plates:
|
||||
sub_label = label
|
||||
break
|
||||
|
||||
# Send the result to the API
|
||||
resp = requests.post(
|
||||
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
|
||||
json={
|
||||
"camera": obj_data.get("camera"),
|
||||
"subLabel": sub_label,
|
||||
"subLabelScore": avg_confidence,
|
||||
},
|
||||
)
|
||||
|
||||
if resp.status_code == 200:
|
||||
self.detected_license_plates[id] = {
|
||||
"plate": top_plate,
|
||||
"char_confidences": top_char_confidences,
|
||||
}
|
||||
|
||||
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
|
||||
"""Return jpg thumbnail of a region of the frame."""
|
||||
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)
|
||||
|
||||
Reference in New Issue
Block a user