forked from Github/frigate
create typed config classes
This commit is contained in:
@@ -13,6 +13,7 @@ import itertools
|
||||
import matplotlib.pyplot as plt
|
||||
from frigate.util import draw_box_with_label, SharedMemoryFrameManager
|
||||
from frigate.edgetpu import load_labels
|
||||
from frigate.config import CameraConfig
|
||||
from typing import Callable, Dict
|
||||
from statistics import mean, median
|
||||
|
||||
@@ -33,16 +34,16 @@ def zone_filtered(obj, object_config):
|
||||
|
||||
# if the min area is larger than the
|
||||
# detected object, don't add it to detected objects
|
||||
if obj_settings.get('min_area',-1) > obj['area']:
|
||||
if obj_settings.min_area > obj['area']:
|
||||
return True
|
||||
|
||||
# if the detected object is larger than the
|
||||
# max area, don't add it to detected objects
|
||||
if obj_settings.get('max_area', 24000000) < obj['area']:
|
||||
if obj_settings.max_area < obj['area']:
|
||||
return True
|
||||
|
||||
# if the score is lower than the threshold, skip
|
||||
if obj_settings.get('threshold', 0) > obj['computed_score']:
|
||||
if obj_settings.threshold > obj['computed_score']:
|
||||
return True
|
||||
|
||||
return False
|
||||
@@ -58,7 +59,7 @@ class CameraState():
|
||||
self.object_status = defaultdict(lambda: 'OFF')
|
||||
self.tracked_objects = {}
|
||||
self.zone_objects = defaultdict(lambda: [])
|
||||
self._current_frame = np.zeros((self.config['frame_shape'][0]*3//2, self.config['frame_shape'][1]), np.uint8)
|
||||
self._current_frame = np.zeros(self.config.frame_shape_yuv, np.uint8)
|
||||
self.current_frame_lock = threading.Lock()
|
||||
self.current_frame_time = 0.0
|
||||
self.previous_frame_id = None
|
||||
@@ -89,14 +90,14 @@ class CameraState():
|
||||
region = obj['region']
|
||||
cv2.rectangle(frame_copy, (region[0], region[1]), (region[2], region[3]), (0,255,0), 1)
|
||||
|
||||
if self.config['snapshots']['show_timestamp']:
|
||||
if self.config.snapshots.show_timestamp:
|
||||
time_to_show = datetime.datetime.fromtimestamp(frame_time).strftime("%m/%d/%Y %H:%M:%S")
|
||||
cv2.putText(frame_copy, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
|
||||
|
||||
if self.config['snapshots']['draw_zones']:
|
||||
for name, zone in self.config['zones'].items():
|
||||
if self.config.snapshots.draw_zones:
|
||||
for name, zone in self.config.zones.items():
|
||||
thickness = 8 if any([name in obj['zones'] for obj in tracked_objects.values()]) else 2
|
||||
cv2.drawContours(frame_copy, [zone['contour']], -1, zone['color'], thickness)
|
||||
cv2.drawContours(frame_copy, [zone.contour], -1, zone.color, thickness)
|
||||
|
||||
return frame_copy
|
||||
|
||||
@@ -105,7 +106,7 @@ class CameraState():
|
||||
if not obj.get('false_positive', True):
|
||||
return False
|
||||
|
||||
threshold = self.config['objects'].get('filters', {}).get(obj['label'], {}).get('threshold', 0.85)
|
||||
threshold = self.config.objects.filters[obj['label']].threshold
|
||||
if obj['computed_score'] < threshold:
|
||||
return True
|
||||
return False
|
||||
@@ -124,7 +125,7 @@ class CameraState():
|
||||
self.current_frame_time = frame_time
|
||||
# get the new frame and delete the old frame
|
||||
frame_id = f"{self.name}{frame_time}"
|
||||
current_frame = self.frame_manager.get(frame_id, (self.config['frame_shape'][0]*3//2, self.config['frame_shape'][1]))
|
||||
current_frame = self.frame_manager.get(frame_id, self.config.frame_shape_yuv)
|
||||
|
||||
current_ids = tracked_objects.keys()
|
||||
previous_ids = self.tracked_objects.keys()
|
||||
@@ -184,12 +185,12 @@ class CameraState():
|
||||
current_zones = []
|
||||
bottom_center = (obj['centroid'][0], obj['box'][3])
|
||||
# check each zone
|
||||
for name, zone in self.config['zones'].items():
|
||||
contour = zone['contour']
|
||||
for name, zone in self.config.zones.items():
|
||||
contour = zone.contour
|
||||
# check if the object is in the zone
|
||||
if (cv2.pointPolygonTest(contour, bottom_center, False) >= 0):
|
||||
# if the object passed the filters once, dont apply again
|
||||
if name in obj.get('zones', []) or not zone_filtered(obj, zone.get('filters', {})):
|
||||
if name in obj.get('zones', []) or not zone_filtered(obj, zone.filters):
|
||||
current_zones.append(name)
|
||||
obj['entered_zones'].add(name)
|
||||
|
||||
@@ -208,7 +209,7 @@ class CameraState():
|
||||
now = datetime.datetime.now().timestamp()
|
||||
# if the object is a higher score than the current best score
|
||||
# or the current object is older than desired, use the new object
|
||||
if obj_copy['score'] > current_best['score'] or (now - current_best['frame_time']) > self.config.get('best_image_timeout', 60):
|
||||
if obj_copy['score'] > current_best['score'] or (now - current_best['frame_time']) > self.config.best_image_timeout:
|
||||
obj_copy['frame'] = np.copy(current_frame)
|
||||
self.best_objects[object_type] = obj_copy
|
||||
for c in self.callbacks['snapshot']:
|
||||
@@ -249,7 +250,7 @@ class CameraState():
|
||||
self.previous_frame_id = frame_id
|
||||
|
||||
class TrackedObjectProcessor(threading.Thread):
|
||||
def __init__(self, camera_config, client, topic_prefix, tracked_objects_queue, event_queue, stop_event):
|
||||
def __init__(self, camera_config: Dict[str, CameraConfig], client, topic_prefix, tracked_objects_queue, event_queue, stop_event):
|
||||
threading.Thread.__init__(self)
|
||||
self.camera_config = camera_config
|
||||
self.client = client
|
||||
@@ -296,22 +297,22 @@ class TrackedObjectProcessor(threading.Thread):
|
||||
return
|
||||
|
||||
best_frame = cv2.cvtColor(obj['frame'], cv2.COLOR_YUV2BGR_I420)
|
||||
if self.camera_config[camera]['snapshots']['draw_bounding_boxes']:
|
||||
if self.camera_config[camera].snapshots.draw_bounding_boxes:
|
||||
thickness = 2
|
||||
color = COLOR_MAP[obj['label']]
|
||||
box = obj['box']
|
||||
draw_box_with_label(best_frame, box[0], box[1], box[2], box[3], obj['label'], f"{int(obj['score']*100)}% {int(obj['area'])}", thickness=thickness, color=color)
|
||||
|
||||
mqtt_config = self.camera_config[camera].get('mqtt', {'crop_to_region': False})
|
||||
if mqtt_config.get('crop_to_region'):
|
||||
mqtt_config = self.camera_config[camera].mqtt
|
||||
if mqtt_config.crop_to_region:
|
||||
region = obj['region']
|
||||
best_frame = best_frame[region[1]:region[3], region[0]:region[2]]
|
||||
if 'snapshot_height' in mqtt_config:
|
||||
height = int(mqtt_config['snapshot_height'])
|
||||
if mqtt_config.snapshot_height:
|
||||
height = mqtt_config.snapshot_height
|
||||
width = int(height*best_frame.shape[1]/best_frame.shape[0])
|
||||
best_frame = cv2.resize(best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
|
||||
|
||||
if self.camera_config[camera]['snapshots']['show_timestamp']:
|
||||
if self.camera_config[camera].snapshots.show_timestamp:
|
||||
time_to_show = datetime.datetime.fromtimestamp(obj['frame_time']).strftime("%m/%d/%Y %H:%M:%S")
|
||||
size = cv2.getTextSize(time_to_show, cv2.FONT_HERSHEY_SIMPLEX, fontScale=1, thickness=2)
|
||||
text_width = size[0][0]
|
||||
@@ -351,26 +352,6 @@ class TrackedObjectProcessor(threading.Thread):
|
||||
# }
|
||||
# }
|
||||
self.zone_data = defaultdict(lambda: defaultdict(lambda: set()))
|
||||
|
||||
# set colors for zones
|
||||
all_zone_names = set([zone for config in self.camera_config.values() for zone in config['zones'].keys()])
|
||||
zone_colors = {}
|
||||
colors = plt.cm.get_cmap('tab10', len(all_zone_names))
|
||||
for i, zone in enumerate(all_zone_names):
|
||||
zone_colors[zone] = tuple(int(round(255 * c)) for c in colors(i)[:3])
|
||||
|
||||
# create zone contours
|
||||
for camera_config in self.camera_config.values():
|
||||
for zone_name, zone_config in camera_config['zones'].items():
|
||||
zone_config['color'] = zone_colors[zone_name]
|
||||
coordinates = zone_config['coordinates']
|
||||
if isinstance(coordinates, list):
|
||||
zone_config['contour'] = np.array([[int(p.split(',')[0]), int(p.split(',')[1])] for p in coordinates])
|
||||
elif isinstance(coordinates, str):
|
||||
points = coordinates.split(',')
|
||||
zone_config['contour'] = np.array([[int(points[i]), int(points[i+1])] for i in range(0, len(points), 2)])
|
||||
else:
|
||||
print(f"Unable to parse zone coordinates for {zone_name} - {camera}")
|
||||
|
||||
def get_best(self, camera, label):
|
||||
best_objects = self.camera_states[camera].best_objects
|
||||
@@ -398,7 +379,7 @@ class TrackedObjectProcessor(threading.Thread):
|
||||
camera_state.update(frame_time, current_tracked_objects)
|
||||
|
||||
# update zone status for each label
|
||||
for zone in camera_state.config['zones'].keys():
|
||||
for zone in camera_state.config.zones.keys():
|
||||
# get labels for current camera and all labels in current zone
|
||||
labels_for_camera = set([obj['label'] for obj in camera_state.tracked_objects.values() if zone in obj['zones'] and not obj['false_positive']])
|
||||
labels_to_check = labels_for_camera | set(self.zone_data[zone].keys())
|
||||
|
||||
Reference in New Issue
Block a user