Typing: mypy fixes for

* __main__.py
 * app.py
 * models.py
 * plus.py
 * stats.py

In addition a new module was introduced: types
There all TypedDicts are included. Bitte geben Sie eine Commit-Beschreibung für Ihre Änderungen ein. Zeilen,
This commit is contained in:
Sebastian Englbrecht
2022-04-16 17:40:04 +02:00
committed by Blake Blackshear
parent ebf4e43ced
commit cafe0917c7
10 changed files with 174 additions and 87 deletions

View File

@@ -1,12 +1,16 @@
import datetime
import logging
import os
import requests
from frigate.const import PLUS_ENV_VAR, PLUS_API_HOST
from requests.models import Response
import cv2
from numpy import ndarray
logger = logging.getLogger(__name__)
def get_jpg_bytes(image, max_dim, quality):
def get_jpg_bytes(image: ndarray, max_dim: int, quality: int) -> bytes:
if image.shape[1] >= image.shape[0]:
width = min(max_dim, image.shape[1])
height = int(width * image.shape[0] / image.shape[1])
@@ -17,45 +21,54 @@ def get_jpg_bytes(image, max_dim, quality):
original = cv2.resize(image, dsize=(width, height), interpolation=cv2.INTER_AREA)
ret, jpg = cv2.imencode(".jpg", original, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
return jpg.tobytes()
jpg_bytes = jpg.tobytes()
return jpg_bytes if type(jpg_bytes) is bytes else b""
class PlusApi:
def __init__(self, host, key: str):
self.host = host
self.key = key
self._token_data = None
def __init__(self) -> None:
self.host = PLUS_API_HOST
if PLUS_ENV_VAR in os.environ:
self.key = os.environ.get(PLUS_ENV_VAR)
else:
self.key = None
self._is_active: bool = self.key is not None
self._token_data: dict = {}
def _refresh_token_if_needed(self):
def _refresh_token_if_needed(self) -> None:
if (
self._token_data is None
self._token_data.get("expires") is None
or self._token_data["expires"] - datetime.datetime.now().timestamp() < 60
):
if self.key is None:
raise Exception("Plus API not activated")
parts = self.key.split(":")
r = requests.get(f"{self.host}/v1/auth/token", auth=(parts[0], parts[1]))
self._token_data = r.json()
def _get_authorization_header(self):
def _get_authorization_header(self) -> dict:
self._refresh_token_if_needed()
return {"authorization": f"Bearer {self._token_data['accessToken']}"}
return {"authorization": f"Bearer {self._token_data.get('accessToken')}"}
def _get(self, path):
def _get(self, path: str) -> Response:
return requests.get(
f"{self.host}/v1/{path}", headers=self._get_authorization_header()
)
def _post(self, path, data):
def _post(self, path: str, data: dict) -> Response:
return requests.post(
f"{self.host}/v1/{path}",
headers=self._get_authorization_header(),
json=data,
)
def upload_image(self, image, camera: str):
def is_active(self) -> bool:
return self._is_active
def upload_image(self, image: ndarray, camera: str) -> str:
r = self._get("image/signed_urls")
presigned_urls = r.json()
if not r.ok:
logger.exception(ex)
raise Exception("Unable to get signed urls")
# resize and submit original
@@ -93,4 +106,4 @@ class PlusApi:
raise Exception(r.text)
# return image id
return presigned_urls["imageId"]
return str(presigned_urls.get("imageId"))