forked from Github/frigate
Add ability to configure genai to use snapshot instead of thumbnails (#14077)
* Allow embedding of snapshot for description via config option * docs * frontend button * Backend * crop snapshot to region * only show dropdown when event has snapshot * fix cursor on dropdown * crop on initial generation as well * use enum for type * fix type
This commit is contained in:
9
frigate/api/defs/regenerate_query_parameters.py
Normal file
9
frigate/api/defs/regenerate_query_parameters.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from frigate.events.types import RegenerateDescriptionEnum
|
||||
|
||||
|
||||
class RegenerateQueryParameters(BaseModel):
|
||||
source: Optional[RegenerateDescriptionEnum] = RegenerateDescriptionEnum.thumbnails
|
||||
@@ -31,6 +31,9 @@ from frigate.api.defs.events_query_parameters import (
|
||||
EventsSearchQueryParams,
|
||||
EventsSummaryQueryParams,
|
||||
)
|
||||
from frigate.api.defs.regenerate_query_parameters import (
|
||||
RegenerateQueryParameters,
|
||||
)
|
||||
from frigate.api.defs.tags import Tags
|
||||
from frigate.const import (
|
||||
CLIPS_DIR,
|
||||
@@ -996,7 +999,9 @@ def set_description(
|
||||
|
||||
|
||||
@router.put("/events/{event_id}/description/regenerate")
|
||||
def regenerate_description(request: Request, event_id: str):
|
||||
def regenerate_description(
|
||||
request: Request, event_id: str, params: RegenerateQueryParameters = Depends()
|
||||
):
|
||||
try:
|
||||
event: Event = Event.get(Event.id == event_id)
|
||||
except DoesNotExist:
|
||||
@@ -1009,7 +1014,7 @@ def regenerate_description(request: Request, event_id: str):
|
||||
request.app.frigate_config.semantic_search.enabled
|
||||
and request.app.frigate_config.genai.enabled
|
||||
):
|
||||
request.app.event_metadata_updater.publish(event.id)
|
||||
request.app.event_metadata_updater.publish((event.id, params.source))
|
||||
|
||||
return JSONResponse(
|
||||
content=(
|
||||
@@ -1017,7 +1022,8 @@ def regenerate_description(request: Request, event_id: str):
|
||||
"success": True,
|
||||
"message": "Event "
|
||||
+ event_id
|
||||
+ " description regeneration has been requested.",
|
||||
+ " description regeneration has been requested using "
|
||||
+ params.source,
|
||||
}
|
||||
),
|
||||
status_code=200,
|
||||
|
||||
@@ -4,6 +4,8 @@ import logging
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
from frigate.events.types import RegenerateDescriptionEnum
|
||||
|
||||
from .zmq_proxy import Publisher, Subscriber
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -23,6 +25,9 @@ class EventMetadataPublisher(Publisher):
|
||||
topic = topic.value
|
||||
super().__init__(topic)
|
||||
|
||||
def publish(self, payload: tuple[str, RegenerateDescriptionEnum]) -> None:
|
||||
super().publish(payload)
|
||||
|
||||
|
||||
class EventMetadataSubscriber(Subscriber):
|
||||
"""Simplifies receiving event metadata."""
|
||||
@@ -35,10 +40,12 @@ class EventMetadataSubscriber(Subscriber):
|
||||
|
||||
def check_for_update(
|
||||
self, timeout: float = None
|
||||
) -> Optional[tuple[EventMetadataTypeEnum, any]]:
|
||||
) -> Optional[tuple[EventMetadataTypeEnum, str, RegenerateDescriptionEnum]]:
|
||||
return super().check_for_update(timeout)
|
||||
|
||||
def _return_object(self, topic: str, payload: any) -> any:
|
||||
if payload is None:
|
||||
return (None, None)
|
||||
return (EventMetadataTypeEnum[topic[len(self.topic_base) :]], payload)
|
||||
return (None, None, None)
|
||||
topic = EventMetadataTypeEnum[topic[len(self.topic_base) :]]
|
||||
event_id, source = payload
|
||||
return (topic, event_id, RegenerateDescriptionEnum(source))
|
||||
|
||||
@@ -18,6 +18,9 @@ class GenAIProviderEnum(str, Enum):
|
||||
# uses BaseModel because some global attributes are not available at the camera level
|
||||
class GenAICameraConfig(BaseModel):
|
||||
enabled: bool = Field(default=False, title="Enable GenAI for camera.")
|
||||
use_snapshot: bool = Field(
|
||||
default=False, title="Use snapshots for generating descriptions."
|
||||
)
|
||||
prompt: str = Field(
|
||||
default="Describe the {label} in the sequence of images with as much detail as possible. Do not describe the background.",
|
||||
title="Default caption prompt.",
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
import base64
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
from multiprocessing.synchronize import Event as MpEvent
|
||||
from typing import Optional
|
||||
@@ -19,7 +20,7 @@ from frigate.comms.event_metadata_updater import (
|
||||
from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber
|
||||
from frigate.comms.inter_process import InterProcessRequestor
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.const import UPDATE_EVENT_DESCRIPTION
|
||||
from frigate.const import CLIPS_DIR, UPDATE_EVENT_DESCRIPTION
|
||||
from frigate.events.types import EventTypeEnum
|
||||
from frigate.genai import get_genai_client
|
||||
from frigate.models import Event
|
||||
@@ -136,6 +137,41 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
or set(event.zones) & set(camera_config.genai.required_zones)
|
||||
)
|
||||
):
|
||||
if event.has_snapshot and camera_config.genai.use_snapshot:
|
||||
with open(
|
||||
os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg"),
|
||||
"rb",
|
||||
) as image_file:
|
||||
snapshot_image = image_file.read()
|
||||
|
||||
img = cv2.imdecode(
|
||||
np.frombuffer(snapshot_image, dtype=np.int8),
|
||||
cv2.IMREAD_COLOR,
|
||||
)
|
||||
|
||||
# crop snapshot based on region before sending off to genai
|
||||
height, width = img.shape[:2]
|
||||
x1_rel, y1_rel, width_rel, height_rel = event.data["region"]
|
||||
|
||||
x1, y1 = int(x1_rel * width), int(y1_rel * height)
|
||||
cropped_image = img[
|
||||
y1 : y1 + int(height_rel * height),
|
||||
x1 : x1 + int(width_rel * width),
|
||||
]
|
||||
|
||||
_, buffer = cv2.imencode(".jpg", cropped_image)
|
||||
snapshot_image = buffer.tobytes()
|
||||
|
||||
embed_image = (
|
||||
[snapshot_image]
|
||||
if event.has_snapshot and camera_config.genai.use_snapshot
|
||||
else (
|
||||
[thumbnail for data in self.tracked_events[event_id]]
|
||||
if len(self.tracked_events.get(event_id, [])) > 0
|
||||
else [thumbnail]
|
||||
)
|
||||
)
|
||||
|
||||
# Generate the description. Call happens in a thread since it is network bound.
|
||||
threading.Thread(
|
||||
target=self._embed_description,
|
||||
@@ -143,12 +179,7 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
daemon=True,
|
||||
args=(
|
||||
event,
|
||||
[
|
||||
data["thumbnail"]
|
||||
for data in self.tracked_events[event_id]
|
||||
]
|
||||
if len(self.tracked_events.get(event_id, [])) > 0
|
||||
else [thumbnail],
|
||||
embed_image,
|
||||
metadata,
|
||||
),
|
||||
).start()
|
||||
@@ -159,13 +190,15 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
|
||||
def _process_event_metadata(self):
|
||||
# Check for regenerate description requests
|
||||
(topic, event_id) = self.event_metadata_subscriber.check_for_update(timeout=1)
|
||||
(topic, event_id, source) = self.event_metadata_subscriber.check_for_update(
|
||||
timeout=1
|
||||
)
|
||||
|
||||
if topic is None:
|
||||
return
|
||||
|
||||
if event_id:
|
||||
self.handle_regenerate_description(event_id)
|
||||
self.handle_regenerate_description(event_id, source)
|
||||
|
||||
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
|
||||
"""Return jpg thumbnail of a region of the frame."""
|
||||
@@ -228,7 +261,7 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
description,
|
||||
)
|
||||
|
||||
def handle_regenerate_description(self, event_id: str) -> None:
|
||||
def handle_regenerate_description(self, event_id: str, source: str) -> None:
|
||||
try:
|
||||
event: Event = Event.get(Event.id == event_id)
|
||||
except DoesNotExist:
|
||||
@@ -243,4 +276,38 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
metadata = get_metadata(event)
|
||||
thumbnail = base64.b64decode(event.thumbnail)
|
||||
|
||||
self._embed_description(event, [thumbnail], metadata)
|
||||
logger.debug(f"Using ${source} regeneration for ${event}")
|
||||
|
||||
if event.has_snapshot and source == "snapshot":
|
||||
with open(
|
||||
os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg"),
|
||||
"rb",
|
||||
) as image_file:
|
||||
snapshot_image = image_file.read()
|
||||
img = cv2.imdecode(
|
||||
np.frombuffer(snapshot_image, dtype=np.int8), cv2.IMREAD_COLOR
|
||||
)
|
||||
|
||||
# crop snapshot based on region before sending off to genai
|
||||
height, width = img.shape[:2]
|
||||
x1_rel, y1_rel, width_rel, height_rel = event.data["region"]
|
||||
|
||||
x1, y1 = int(x1_rel * width), int(y1_rel * height)
|
||||
cropped_image = img[
|
||||
y1 : y1 + int(height_rel * height), x1 : x1 + int(width_rel * width)
|
||||
]
|
||||
|
||||
_, buffer = cv2.imencode(".jpg", cropped_image)
|
||||
snapshot_image = buffer.tobytes()
|
||||
|
||||
embed_image = (
|
||||
[snapshot_image]
|
||||
if event.has_snapshot and source == "snapshot"
|
||||
else (
|
||||
[thumbnail for data in self.tracked_events[event_id]]
|
||||
if len(self.tracked_events.get(event_id, [])) > 0
|
||||
else [thumbnail]
|
||||
)
|
||||
)
|
||||
|
||||
self._embed_description(event, embed_image, metadata)
|
||||
|
||||
@@ -12,3 +12,8 @@ class EventStateEnum(str, Enum):
|
||||
start = "start"
|
||||
update = "update"
|
||||
end = "end"
|
||||
|
||||
|
||||
class RegenerateDescriptionEnum(str, Enum):
|
||||
thumbnails = "thumbnails"
|
||||
snapshot = "snapshot"
|
||||
|
||||
Reference in New Issue
Block a user