upgrade to python3.8 and switch from plasma store to shared_memory

This commit is contained in:
Blake Blackshear
2020-09-21 21:02:00 -05:00
parent b063099b2a
commit ec4d048905
7 changed files with 128 additions and 154 deletions

View File

@@ -2,12 +2,14 @@ import os
import datetime
import hashlib
import multiprocessing as mp
import queue
from multiprocessing.connection import Connection
from abc import ABC, abstractmethod
from typing import Dict
import numpy as np
import pyarrow.plasma as plasma
import tflite_runtime.interpreter as tflite
from tflite_runtime.interpreter import load_delegate
from frigate.util import EventsPerSecond, listen
from frigate.util import EventsPerSecond, listen, SharedMemoryFrameManager
def load_labels(path, encoding='utf-8'):
"""Loads labels from file (with or without index numbers).
@@ -100,73 +102,77 @@ class LocalObjectDetector(ObjectDetector):
return detections
def run_detector(detection_queue, avg_speed, start, tf_device):
def run_detector(detection_queue, result_connections: Dict[str, Connection], avg_speed, start, tf_device):
print(f"Starting detection process: {os.getpid()}")
listen()
plasma_client = plasma.connect("/tmp/plasma")
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(tf_device=tf_device)
while True:
object_id_str = detection_queue.get()
object_id_hash = hashlib.sha1(str.encode(object_id_str))
object_id = plasma.ObjectID(object_id_hash.digest())
object_id_out = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{object_id_str}")).digest())
input_frame = plasma_client.get(object_id, timeout_ms=0)
connection_id = detection_queue.get()
input_frame = frame_manager.get(connection_id, (1,300,300,3))
if input_frame is plasma.ObjectNotAvailable:
if input_frame is None:
continue
# detect and put the output in the plasma store
start.value = datetime.datetime.now().timestamp()
plasma_client.put(object_detector.detect_raw(input_frame), object_id_out)
# TODO: what is the overhead for pickling this result vs writing back to shared memory?
# I could try using an Event() and waiting in the other process before looking in memory...
detections = object_detector.detect_raw(input_frame)
result_connections[connection_id].send(detections)
duration = datetime.datetime.now().timestamp()-start.value
start.value = 0.0
avg_speed.value = (avg_speed.value*9 + duration)/10
class EdgeTPUProcess():
def __init__(self, tf_device=None):
def __init__(self, result_connections, tf_device=None):
self.result_connections = result_connections
self.detection_queue = mp.Queue()
self.avg_inference_speed = mp.Value('d', 0.01)
self.detection_start = mp.Value('d', 0.0)
self.detect_process = None
self.tf_device = tf_device
self.start_or_restart()
def stop(self):
self.detect_process.terminate()
print("Waiting for detection process to exit gracefully...")
self.detect_process.join(timeout=30)
if self.detect_process.exitcode is None:
print("Detection process didnt exit. Force killing...")
self.detect_process.kill()
self.detect_process.join()
def start_or_restart(self):
self.detection_start.value = 0.0
if (not self.detect_process is None) and self.detect_process.is_alive():
self.detect_process.terminate()
print("Waiting for detection process to exit gracefully...")
self.detect_process.join(timeout=30)
if self.detect_process.exitcode is None:
print("Detection process didnt exit. Force killing...")
self.detect_process.kill()
self.detect_process.join()
self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.avg_inference_speed, self.detection_start, self.tf_device))
self.stop()
self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.result_connections, self.avg_inference_speed, self.detection_start, self.tf_device))
self.detect_process.daemon = True
self.detect_process.start()
class RemoteObjectDetector():
def __init__(self, name, labels, detection_queue):
def __init__(self, name, labels, detection_queue, result_connection: Connection):
self.labels = load_labels(labels)
self.name = name
self.fps = EventsPerSecond()
self.plasma_client = plasma.connect("/tmp/plasma")
self.detection_queue = detection_queue
self.result_connection = result_connection
self.shm = mp.shared_memory.SharedMemory(name=self.name, create=True, size=300*300*3)
self.np_shm = np.ndarray((1,300,300,3), dtype=np.uint8, buffer=self.shm.buf)
def detect(self, tensor_input, threshold=.4):
detections = []
now = f"{self.name}-{str(datetime.datetime.now().timestamp())}"
object_id_frame = plasma.ObjectID(hashlib.sha1(str.encode(now)).digest())
object_id_detections = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{now}")).digest())
self.plasma_client.put(tensor_input, object_id_frame)
self.detection_queue.put(now)
raw_detections = self.plasma_client.get(object_id_detections, timeout_ms=10000)
if raw_detections is plasma.ObjectNotAvailable:
self.plasma_client.delete([object_id_frame])
# copy input to shared memory
# TODO: what if I just write it there in the first place?
self.np_shm[:] = tensor_input[:]
self.detection_queue.put(self.name)
if self.result_connection.poll(10):
raw_detections = self.result_connection.recv()
else:
return detections
for d in raw_detections:
@@ -177,6 +183,5 @@ class RemoteObjectDetector():
float(d[1]),
(d[2], d[3], d[4], d[5])
))
self.plasma_client.delete([object_id_frame, object_id_detections])
self.fps.update()
return detections

View File

@@ -10,9 +10,8 @@ import copy
import numpy as np
from collections import Counter, defaultdict
import itertools
import pyarrow.plasma as plasma
import matplotlib.pyplot as plt
from frigate.util import draw_box_with_label, PlasmaFrameManager
from frigate.util import draw_box_with_label, SharedMemoryFrameManager
from frigate.edgetpu import load_labels
from typing import Callable, Dict
from statistics import mean, median
@@ -59,7 +58,7 @@ class CameraState():
self.object_status = defaultdict(lambda: 'OFF')
self.tracked_objects = {}
self.zone_objects = defaultdict(lambda: [])
self.current_frame = np.zeros((720,1280,3), np.uint8)
self.current_frame = np.zeros(self.config['frame_shape'], np.uint8)
self.current_frame_time = 0.0
self.previous_frame_id = None
self.callbacks = defaultdict(lambda: [])
@@ -88,7 +87,7 @@ class CameraState():
self.current_frame_time = frame_time
# get the new frame and delete the old frame
frame_id = f"{self.name}{frame_time}"
self.current_frame = self.frame_manager.get(frame_id)
self.current_frame = self.frame_manager.get(frame_id, self.config['frame_shape'])
if not self.previous_frame_id is None:
self.frame_manager.delete(self.previous_frame_id)
self.previous_frame_id = frame_id
@@ -238,7 +237,7 @@ class TrackedObjectProcessor(threading.Thread):
self.event_queue = event_queue
self.stop_event = stop_event
self.camera_states: Dict[str, CameraState] = {}
self.plasma_client = PlasmaFrameManager(self.stop_event)
self.frame_manager = SharedMemoryFrameManager()
def start(camera, obj):
# publish events to mqtt
@@ -273,7 +272,7 @@ class TrackedObjectProcessor(threading.Thread):
self.client.publish(f"{self.topic_prefix}/{camera}/{object_name}", status, retain=False)
for camera in self.camera_config.keys():
camera_state = CameraState(camera, self.camera_config[camera], self.plasma_client)
camera_state = CameraState(camera, self.camera_config[camera], self.frame_manager)
camera_state.on('start', start)
camera_state.on('update', update)
camera_state.on('end', end)

View File

@@ -9,7 +9,8 @@ import cv2
import threading
import matplotlib.pyplot as plt
import hashlib
import pyarrow.plasma as plasma
from multiprocessing import shared_memory
from typing import AnyStr
def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'):
if color is None:
@@ -148,12 +149,16 @@ def listen():
signal.signal(signal.SIGUSR1, print_stack)
class FrameManager(ABC):
@abstractmethod
def create(self, name, size) -> AnyStr:
pass
@abstractmethod
def get(self, name, timeout_ms=0):
pass
@abstractmethod
def put(self, name, frame):
def close(self, name):
pass
@abstractmethod
@@ -164,66 +169,45 @@ class DictFrameManager(FrameManager):
def __init__(self):
self.frames = {}
def get(self, name, timeout_ms=0):
return self.frames.get(name)
def create(self, name, size) -> AnyStr:
mem = bytearray(size)
self.frames[name] = mem
return mem
def put(self, name, frame):
self.frames[name] = frame
def get(self, name, shape):
mem = self.frames[name]
return np.ndarray(shape, dtype=np.uint8, buffer=mem)
def close(self, name):
pass
def delete(self, name):
del self.frames[name]
class PlasmaFrameManager(FrameManager):
def __init__(self, stop_event=None):
self.stop_event = stop_event
self.connect()
class SharedMemoryFrameManager(FrameManager):
def __init__(self):
self.shm_store = {}
def connect(self):
while True:
if self.stop_event != None and self.stop_event.is_set():
return
try:
self.plasma_client = plasma.connect("/tmp/plasma")
return
except:
print(f"TrackedObjectProcessor: unable to connect plasma client")
time.sleep(10)
def create(self, name, size) -> AnyStr:
shm = shared_memory.SharedMemory(name=name, create=True, size=size)
self.shm_store[name] = shm
return shm.buf
def get(self, name, timeout_ms=0):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
if self.stop_event != None and self.stop_event.is_set():
return
try:
frame = self.plasma_client.get(object_id, timeout_ms=timeout_ms)
if frame is plasma.ObjectNotAvailable:
return None
return frame
except:
self.connect()
time.sleep(1)
def get(self, name, shape):
if name in self.shm_store:
shm = self.shm_store[name]
else:
shm = shared_memory.SharedMemory(name=name)
self.shm_store[name] = shm
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
def put(self, name, frame):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
if self.stop_event != None and self.stop_event.is_set():
return
try:
self.plasma_client.put(frame, object_id)
return
except Exception as e:
print(f"Failed to put in plasma: {e}")
self.connect()
time.sleep(1)
def close(self, name):
if name in self.shm_store:
self.shm_store[name].close()
del self.shm_store[name]
def delete(self, name):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
if self.stop_event != None and self.stop_event.is_set():
return
try:
self.plasma_client.delete([object_id])
return
except:
self.connect()
time.sleep(1)
if name in self.shm_store:
self.shm_store[name].close()
self.shm_store[name].unlink()
del self.shm_store[name]

View File

@@ -5,7 +5,6 @@ import cv2
import queue
import threading
import ctypes
import pyarrow.plasma as plasma
import multiprocessing as mp
import subprocess as sp
import numpy as np
@@ -15,7 +14,7 @@ import json
import base64
from typing import Dict, List
from collections import defaultdict
from frigate.util import draw_box_with_label, area, calculate_region, clipped, intersection_over_union, intersection, EventsPerSecond, listen, FrameManager, PlasmaFrameManager
from frigate.util import draw_box_with_label, area, calculate_region, clipped, intersection_over_union, intersection, EventsPerSecond, listen, FrameManager, SharedMemoryFrameManager
from frigate.objects import ObjectTracker
from frigate.edgetpu import RemoteObjectDetector
from frigate.motion import MotionDetector
@@ -154,11 +153,10 @@ def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: Fram
continue
# put the frame in the frame manager
frame_manager.put(f"{camera_name}{current_frame.value}",
np
.frombuffer(frame_bytes, np.uint8)
.reshape(frame_shape)
)
frame_buffer = frame_manager.create(f"{camera_name}{current_frame.value}", frame_size)
frame_buffer[:] = frame_bytes[:]
frame_manager.close(f"{camera_name}{current_frame.value}")
# add to the queue
frame_queue.put(current_frame.value)
last_frame = current_frame.value
@@ -173,7 +171,7 @@ class CameraCapture(threading.Thread):
self.take_frame = take_frame
self.fps = fps
self.skipped_fps = EventsPerSecond()
self.plasma_client = PlasmaFrameManager(stop_event)
self.frame_manager = SharedMemoryFrameManager()
self.ffmpeg_process = ffmpeg_process
self.current_frame = mp.Value('d', 0.0)
self.last_frame = 0
@@ -182,10 +180,10 @@ class CameraCapture(threading.Thread):
def run(self):
self.skipped_fps.start()
capture_frames(self.ffmpeg_process, self.name, self.frame_shape, self.plasma_client, self.frame_queue, self.take_frame,
capture_frames(self.ffmpeg_process, self.name, self.frame_shape, self.frame_manager, self.frame_queue, self.take_frame,
self.fps, self.skipped_fps, self.stop_event, self.detection_frame, self.current_frame)
def track_camera(name, config, frame_queue, frame_shape, detection_queue, detected_objects_queue, fps, detection_fps, read_start, detection_frame, stop_event):
def track_camera(name, config, frame_queue, frame_shape, detection_queue, result_connection, detected_objects_queue, fps, detection_fps, read_start, detection_frame, stop_event):
print(f"Starting process for {name}: {os.getpid()}")
listen()
@@ -218,13 +216,13 @@ def track_camera(name, config, frame_queue, frame_shape, detection_queue, detect
mask[:] = 255
motion_detector = MotionDetector(frame_shape, mask, resize_factor=6)
object_detector = RemoteObjectDetector(name, '/labelmap.txt', detection_queue)
object_detector = RemoteObjectDetector(name, '/labelmap.txt', detection_queue, result_connection)
object_tracker = ObjectTracker(10)
plasma_client = PlasmaFrameManager()
frame_manager = SharedMemoryFrameManager()
process_frames(name, frame_queue, frame_shape, plasma_client, motion_detector, object_detector,
process_frames(name, frame_queue, frame_shape, frame_manager, motion_detector, object_detector,
object_tracker, detected_objects_queue, fps, detection_fps, detection_frame, objects_to_track, object_filters, mask, stop_event)
print(f"{name}: exiting subprocess")
@@ -281,7 +279,7 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape,
current_frame_time.value = frame_time
frame = frame_manager.get(f"{camera_name}{frame_time}")
frame = frame_manager.get(f"{camera_name}{frame_time}", frame_shape)
if frame is None:
print(f"{camera_name}: frame {frame_time} is not in memory store.")
@@ -364,3 +362,5 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape,
detected_objects_queue.put((camera_name, frame_time, object_tracker.tracked_objects))
detection_fps.value = object_detector.fps.eps()
frame_manager.close(f"{camera_name}{frame_time}")