Abstract MQTT from communication and make mqtt optional (#4462)

* Add option for mqtt config

* Setup communication layer

* Have a dispatcher which is responsible for handling and sending messages

* Move mqtt to communication

* Separate ws communications module

* Make ws client conform to communicator

* Cleanup imports

* Migrate to new dispatcher

* Clean up

* Need to set topic prefix

* Remove references to mqtt in dispatcher

* Don't start mqtt until dispatcher is subscribed

* Cleanup

* Shorten package

* Formatting

* Remove unused

* Cleanup

* Rename mqtt to ws on web

* Fix ws mypy

* Fix mypy

* Reformat

* Cleanup if/else chain

* Catch bad set commands
This commit is contained in:
Nicolas Mowen
2022-11-23 19:03:20 -07:00
committed by GitHub
parent 370276a7b6
commit 6c0978498d
23 changed files with 594 additions and 560 deletions

View File

@@ -12,6 +12,7 @@ from typing import Callable
import cv2
import numpy as np
from frigate.comms.dispatcher import Dispatcher
from frigate.config import (
CameraConfig,
MqttConfig,
@@ -20,7 +21,6 @@ from frigate.config import (
FrigateConfig,
)
from frigate.const import CLIPS_DIR
from frigate.mqtt import FrigateMqttClient
from frigate.util import (
SharedMemoryFrameManager,
calculate_region,
@@ -633,7 +633,7 @@ class TrackedObjectProcessor(threading.Thread):
def __init__(
self,
config: FrigateConfig,
client: FrigateMqttClient,
dispatcher: Dispatcher,
topic_prefix,
tracked_objects_queue,
event_queue,
@@ -645,7 +645,7 @@ class TrackedObjectProcessor(threading.Thread):
threading.Thread.__init__(self)
self.name = "detected_frames_processor"
self.config = config
self.client = client
self.dispatcher = dispatcher
self.topic_prefix = topic_prefix
self.tracked_objects_queue = tracked_objects_queue
self.event_queue = event_queue
@@ -669,7 +669,7 @@ class TrackedObjectProcessor(threading.Thread):
"after": after,
"type": "new" if obj.previous["false_positive"] else "update",
}
self.client.publish(
self.dispatcher.publish(
f"{self.topic_prefix}/events", json.dumps(message), retain=False
)
obj.previous = after
@@ -724,7 +724,7 @@ class TrackedObjectProcessor(threading.Thread):
"after": obj.to_dict(),
"type": "end",
}
self.client.publish(
self.dispatcher.publish(
f"{self.topic_prefix}/events", json.dumps(message), retain=False
)
@@ -746,14 +746,14 @@ class TrackedObjectProcessor(threading.Thread):
f"Unable to send mqtt snapshot for {obj.obj_data['id']}."
)
else:
self.client.publish(
self.dispatcher.publish(
f"{self.topic_prefix}/{camera}/{obj.obj_data['label']}/snapshot",
jpg_bytes,
retain=True,
)
def object_status(camera, object_name, status):
self.client.publish(
self.dispatcher.publish(
f"{self.topic_prefix}/{camera}/{object_name}", status, retain=False
)
@@ -853,7 +853,7 @@ class TrackedObjectProcessor(threading.Thread):
if motion_boxes:
# only send ON if motion isn't already active
if self.last_motion_detected.get(camera, 0) == 0:
self.client.publish(
self.dispatcher.publish(
f"{self.topic_prefix}/{camera}/motion",
"ON",
retain=False,
@@ -866,7 +866,7 @@ class TrackedObjectProcessor(threading.Thread):
# If no motion, make sure the off_delay has passed
if frame_time - self.last_motion_detected.get(camera, 0) >= mqtt_delay:
self.client.publish(
self.dispatcher.publish(
f"{self.topic_prefix}/{camera}/motion",
"OFF",
retain=False,
@@ -962,7 +962,7 @@ class TrackedObjectProcessor(threading.Thread):
)
new_count = sum(zone_label.values())
if new_count != current_count:
self.client.publish(
self.dispatcher.publish(
f"{self.topic_prefix}/{zone}/{label}",
new_count,
retain=False,
@@ -975,7 +975,7 @@ class TrackedObjectProcessor(threading.Thread):
else:
if label in obj_counter:
zone_label[camera] = obj_counter[label]
self.client.publish(
self.dispatcher.publish(
f"{self.topic_prefix}/{zone}/{label}",
obj_counter[label],
retain=False,
@@ -992,7 +992,7 @@ class TrackedObjectProcessor(threading.Thread):
new_count = sum(zone_label.values())
if new_count != current_count:
self.client.publish(
self.dispatcher.publish(
f"{self.topic_prefix}/{zone}/all",
new_count,
retain=False,
@@ -1000,7 +1000,7 @@ class TrackedObjectProcessor(threading.Thread):
# if this is a new zone all label for this camera
else:
zone_label[camera] = total_label_count
self.client.publish(
self.dispatcher.publish(
f"{self.topic_prefix}/{zone}/all",
total_label_count,
retain=False,