Ability to manually create events through the API (#3184)

* Move to events package

* Improve handling of external events

* Handle external events in the event queue

* Pass in event processor

* Check event json

* Fix json parsing and change defaults

* Fix snapshot saving

* Hide % score when not available

* Correct docs and add json example

* Save event png db

* Adjust image

* Formatting

* Add catch for failure ending event

* Add init to modules

* Fix naming

* Formatting

* Fix http creation

* fix test

* Change to PUT and include response in docs

* Add ability to set bounding box locations in snapshot

* Support multiple box annotations

* Cleanup docs example response

Co-authored-by: Blake Blackshear <blake@frigate.video>

* Cleanup docs wording

Co-authored-by: Blake Blackshear <blake@frigate.video>

* Store full frame for thumbnail

* Formatting

* Set thumbnail height to 175

* Formatting

---------

Co-authored-by: Blake Blackshear <blake@frigate.video>
This commit is contained in:
Nicolas Mowen
2023-05-19 04:16:11 -06:00
committed by GitHub
parent 6d0c2ec5c8
commit e357715a8c
12 changed files with 466 additions and 180 deletions

View File

176
frigate/events/cleanup.py Normal file
View File

@@ -0,0 +1,176 @@
"""Cleanup events based on configured retention."""
import datetime
import logging
import os
import threading
from pathlib import Path
from peewee import fn
from frigate.config import FrigateConfig
from frigate.const import CLIPS_DIR
from frigate.models import Event
from multiprocessing.synchronize import Event as MpEvent
logger = logging.getLogger(__name__)
class EventCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
threading.Thread.__init__(self)
self.name = "event_cleanup"
self.config = config
self.stop_event = stop_event
self.camera_keys = list(self.config.cameras.keys())
def expire(self, media_type: str) -> None:
# TODO: Refactor media_type to enum
## Expire events from unlisted cameras based on the global config
if media_type == "clips":
retain_config = self.config.record.events.retain
file_extension = "mp4"
update_params = {"has_clip": False}
else:
retain_config = self.config.snapshots.retain
file_extension = "jpg"
update_params = {"has_snapshot": False}
distinct_labels = (
Event.select(Event.label)
.where(Event.camera.not_in(self.camera_keys))
.distinct()
)
# loop over object types in db
for l in distinct_labels:
# get expiration time for this label
expire_days = retain_config.objects.get(l.label, retain_config.default)
expire_after = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
# grab all events after specific time
expired_events = Event.select().where(
Event.camera.not_in(self.camera_keys),
Event.start_time < expire_after,
Event.label == l.label,
Event.retain_indefinitely == False,
)
# delete the media from disk
for event in expired_events:
media_name = f"{event.camera}-{event.id}"
media_path = Path(
f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
)
media_path.unlink(missing_ok=True)
if file_extension == "jpg":
media_path = Path(
f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
)
media_path.unlink(missing_ok=True)
# update the clips attribute for the db entry
update_query = Event.update(update_params).where(
Event.camera.not_in(self.camera_keys),
Event.start_time < expire_after,
Event.label == l.label,
Event.retain_indefinitely == False,
)
update_query.execute()
## Expire events from cameras based on the camera config
for name, camera in self.config.cameras.items():
if media_type == "clips":
retain_config = camera.record.events.retain
else:
retain_config = camera.snapshots.retain
# get distinct objects in database for this camera
distinct_labels = (
Event.select(Event.label).where(Event.camera == name).distinct()
)
# loop over object types in db
for l in distinct_labels:
# get expiration time for this label
expire_days = retain_config.objects.get(l.label, retain_config.default)
expire_after = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
# grab all events after specific time
expired_events = Event.select().where(
Event.camera == name,
Event.start_time < expire_after,
Event.label == l.label,
Event.retain_indefinitely == False,
)
# delete the grabbed clips from disk
for event in expired_events:
media_name = f"{event.camera}-{event.id}"
media_path = Path(
f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
)
media_path.unlink(missing_ok=True)
if file_extension == "jpg":
media_path = Path(
f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
)
media_path.unlink(missing_ok=True)
# update the clips attribute for the db entry
update_query = Event.update(update_params).where(
Event.camera == name,
Event.start_time < expire_after,
Event.label == l.label,
Event.retain_indefinitely == False,
)
update_query.execute()
def purge_duplicates(self) -> None:
duplicate_query = """with grouped_events as (
select id,
label,
camera,
has_snapshot,
has_clip,
row_number() over (
partition by label, camera, round(start_time/5,0)*5
order by end_time-start_time desc
) as copy_number
from event
)
select distinct id, camera, has_snapshot, has_clip from grouped_events
where copy_number > 1;"""
duplicate_events = Event.raw(duplicate_query)
for event in duplicate_events:
logger.debug(f"Removing duplicate: {event.id}")
media_name = f"{event.camera}-{event.id}"
media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
media_path.unlink(missing_ok=True)
media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}-clean.png")
media_path.unlink(missing_ok=True)
media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4")
media_path.unlink(missing_ok=True)
(
Event.delete()
.where(Event.id << [event.id for event in duplicate_events])
.execute()
)
def run(self) -> None:
# only expire events every 5 minutes
while not self.stop_event.wait(300):
self.expire("clips")
self.expire("snapshots")
self.purge_duplicates()
# drop events from db where has_clip and has_snapshot are false
delete_query = Event.delete().where(
Event.has_clip == False, Event.has_snapshot == False
)
delete_query.execute()
logger.info(f"Exiting event cleanup...")

132
frigate/events/external.py Normal file
View File

@@ -0,0 +1,132 @@
"""Handle external events created by the user."""
import base64
import cv2
import datetime
import glob
import logging
import os
import random
import string
from typing import Optional
from multiprocessing.queues import Queue
from frigate.config import CameraConfig, FrigateConfig
from frigate.const import CLIPS_DIR
from frigate.events.maintainer import EventTypeEnum
from frigate.util import draw_box_with_label
logger = logging.getLogger(__name__)
class ExternalEventProcessor:
def __init__(self, config: FrigateConfig, queue: Queue) -> None:
self.config = config
self.queue = queue
self.default_thumbnail = None
def create_manual_event(
self,
camera: str,
label: str,
sub_label: Optional[str],
duration: Optional[int],
include_recording: bool,
draw: dict[str, any],
snapshot_frame: any,
) -> str:
now = datetime.datetime.now().timestamp()
camera_config = self.config.cameras.get(camera)
# create event id and start frame time
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
event_id = f"{now}-{rand_id}"
thumbnail = self._write_images(
camera_config, label, event_id, draw, snapshot_frame
)
self.queue.put(
(
EventTypeEnum.api,
"new",
camera_config,
{
"id": event_id,
"label": label,
"sub_label": sub_label,
"camera": camera,
"start_time": now,
"end_time": now + duration if duration is not None else None,
"thumbnail": thumbnail,
"has_clip": camera_config.record.enabled and include_recording,
"has_snapshot": True,
},
)
)
return event_id
def finish_manual_event(self, event_id: str) -> None:
"""Finish external event with indeterminate duration."""
now = datetime.datetime.now().timestamp()
self.queue.put(
(EventTypeEnum.api, "end", None, {"id": event_id, "end_time": now})
)
def _write_images(
self,
camera_config: CameraConfig,
label: str,
event_id: str,
draw: dict[str, any],
img_frame: any,
) -> str:
# write clean snapshot if enabled
if camera_config.snapshots.clean_copy:
ret, png = cv2.imencode(".png", img_frame)
if ret:
with open(
os.path.join(
CLIPS_DIR,
f"{camera_config.name}-{event_id}-clean.png",
),
"wb",
) as p:
p.write(png.tobytes())
# write jpg snapshot with optional annotations
if draw.get("boxes") and isinstance(draw.get("boxes"), list):
for box in draw.get("boxes"):
x = box["box"][0] * camera_config.detect.width
y = box["box"][1] * camera_config.detect.height
width = box["box"][2] * camera_config.detect.width
height = box["box"][3] * camera_config.detect.height
draw_box_with_label(
img_frame,
x,
y,
x + width,
y + height,
label,
f"{box.get('score', '-')}% {int(width * height)}",
thickness=2,
color=box.get("color", (255, 0, 0)),
)
ret, jpg = cv2.imencode(".jpg", img_frame)
with open(
os.path.join(CLIPS_DIR, f"{camera_config.name}-{event_id}.jpg"),
"wb",
) as j:
j.write(jpg.tobytes())
# create thumbnail with max height of 175 and save
width = int(175 * img_frame.shape[1] / img_frame.shape[0])
thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA)
ret, jpg = cv2.imencode(".jpg", thumb)
return base64.b64encode(jpg.tobytes()).decode("utf-8")

View File

@@ -0,0 +1,230 @@
import datetime
import logging
import queue
import threading
from enum import Enum
from peewee import fn
from frigate.config import EventsConfig, FrigateConfig
from frigate.models import Event
from frigate.types import CameraMetricsTypes
from frigate.util import to_relative_box
from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event as MpEvent
from typing import Dict
logger = logging.getLogger(__name__)
class EventTypeEnum(str, Enum):
api = "api"
# audio = "audio"
tracked_object = "tracked_object"
def should_update_db(prev_event: Event, current_event: Event) -> bool:
"""If current_event has updated fields and (clip or snapshot)."""
if current_event["has_clip"] or current_event["has_snapshot"]:
# if this is the first time has_clip or has_snapshot turned true
if not prev_event["has_clip"] and not prev_event["has_snapshot"]:
return True
# or if any of the following values changed
if (
prev_event["top_score"] != current_event["top_score"]
or prev_event["entered_zones"] != current_event["entered_zones"]
or prev_event["thumbnail"] != current_event["thumbnail"]
or prev_event["end_time"] != current_event["end_time"]
):
return True
return False
class EventProcessor(threading.Thread):
def __init__(
self,
config: FrigateConfig,
camera_processes: dict[str, CameraMetricsTypes],
event_queue: Queue,
event_processed_queue: Queue,
timeline_queue: Queue,
stop_event: MpEvent,
):
threading.Thread.__init__(self)
self.name = "event_processor"
self.config = config
self.camera_processes = camera_processes
self.event_queue = event_queue
self.event_processed_queue = event_processed_queue
self.timeline_queue = timeline_queue
self.events_in_process: Dict[str, Event] = {}
self.stop_event = stop_event
def run(self) -> None:
# set an end_time on events without an end_time on startup
Event.update(end_time=Event.start_time + 30).where(
Event.end_time == None
).execute()
while not self.stop_event.is_set():
try:
source_type, event_type, camera, event_data = self.event_queue.get(
timeout=1
)
except queue.Empty:
continue
logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
self.timeline_queue.put(
(
camera,
source_type,
event_type,
self.events_in_process.get(event_data["id"]),
event_data,
)
)
if source_type == EventTypeEnum.tracked_object:
if event_type == "start":
self.events_in_process[event_data["id"]] = event_data
continue
self.handle_object_detection(event_type, camera, event_data)
elif source_type == EventTypeEnum.api:
self.handle_external_detection(event_type, event_data)
# set an end_time on events without an end_time before exiting
Event.update(end_time=datetime.datetime.now().timestamp()).where(
Event.end_time == None
).execute()
logger.info(f"Exiting event processor...")
def handle_object_detection(
self,
event_type: str,
camera: str,
event_data: Event,
) -> None:
"""handle tracked object event updates."""
# if this is the first message, just store it and continue, its not time to insert it in the db
if should_update_db(self.events_in_process[event_data["id"]], event_data):
camera_config = self.config.cameras[camera]
event_config: EventsConfig = camera_config.record.events
width = camera_config.detect.width
height = camera_config.detect.height
first_detector = list(self.config.detectors.values())[0]
start_time = event_data["start_time"] - event_config.pre_capture
end_time = (
None
if event_data["end_time"] is None
else event_data["end_time"] + event_config.post_capture
)
# score of the snapshot
score = (
None
if event_data["snapshot"] is None
else event_data["snapshot"]["score"]
)
# detection region in the snapshot
region = (
None
if event_data["snapshot"] is None
else to_relative_box(
width,
height,
event_data["snapshot"]["region"],
)
)
# bounding box for the snapshot
box = (
None
if event_data["snapshot"] is None
else to_relative_box(
width,
height,
event_data["snapshot"]["box"],
)
)
# keep these from being set back to false because the event
# may have started while recordings and snapshots were enabled
# this would be an issue for long running events
if self.events_in_process[event_data["id"]]["has_clip"]:
event_data["has_clip"] = True
if self.events_in_process[event_data["id"]]["has_snapshot"]:
event_data["has_snapshot"] = True
event = {
Event.id: event_data["id"],
Event.label: event_data["label"],
Event.camera: camera,
Event.start_time: start_time,
Event.end_time: end_time,
Event.zones: list(event_data["entered_zones"]),
Event.thumbnail: event_data["thumbnail"],
Event.has_clip: event_data["has_clip"],
Event.has_snapshot: event_data["has_snapshot"],
Event.model_hash: first_detector.model.model_hash,
Event.model_type: first_detector.model.model_type,
Event.detector_type: first_detector.type,
Event.data: {
"box": box,
"region": region,
"score": score,
"top_score": event_data["top_score"],
},
}
(
Event.insert(event)
.on_conflict(
conflict_target=[Event.id],
update=event,
)
.execute()
)
# update the stored copy for comparison on future update messages
self.events_in_process[event_data["id"]] = event_data
if event_type == "end":
del self.events_in_process[event_data["id"]]
self.event_processed_queue.put((event_data["id"], camera))
def handle_external_detection(self, type: str, event_data: Event):
if type == "new":
event = {
Event.id: event_data["id"],
Event.label: event_data["label"],
Event.sub_label: event_data["sub_label"],
Event.camera: event_data["camera"],
Event.start_time: event_data["start_time"],
Event.end_time: event_data["end_time"],
Event.thumbnail: event_data["thumbnail"],
Event.has_clip: event_data["has_clip"],
Event.has_snapshot: event_data["has_snapshot"],
Event.zones: [],
Event.data: {},
}
elif type == "end":
event = {
Event.id: event_data["id"],
Event.end_time: event_data["end_time"],
}
try:
(
Event.insert(event)
.on_conflict(
conflict_target=[Event.id],
update=event,
)
.execute()
)
except Exception:
logger.warning(f"Failed to update manual event: {event_data['id']}")