Files
marriage_frontend/ui/api.py
2025-08-10 17:28:38 +09:00

306 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
import uuid
from typing import Any, Dict, Optional, Tuple, List, Union, IO
import requests
from django.conf import settings
from django.core.exceptions import PermissionDenied
logger = logging.getLogger(__name__)
# === Конфиг лога (можно подкрутить через .env): ===============================
API_DEBUG = bool(int(os.environ.get("API_DEBUG", getattr(settings, "API_DEBUG", 1))))
API_LOG_BODY_MAX = int(os.environ.get("API_LOG_BODY_MAX", getattr(settings, "API_LOG_BODY_MAX", 2000)))
API_LOG_HEADERS = bool(int(os.environ.get("API_LOG_HEADERS", getattr(settings, "API_LOG_HEADERS", 1))))
API_LOG_CURL = bool(int(os.environ.get("API_LOG_CURL", getattr(settings, "API_LOG_CURL", 1))))
API_FALLBACK_OPENAPI_ON_404 = bool(int(os.environ.get("API_FALLBACK_OPENAPI_ON_404", getattr(settings, "API_FALLBACK_OPENAPI_ON_404", 1))))
# === База API и пути по умолчанию (OpenAPI v1): ===============================
def _from_settings_or_env(name: str, default: Optional[str] = None) -> Optional[str]:
return getattr(settings, name, None) or os.environ.get(name, None) or default
_API_BASE_PRIMARY = (_from_settings_or_env("API_BASE_URL", None) or "http://localhost:8080").rstrip("/")
_API_BASE_CACHE = None # текущая выбранная база
_API_LAST_SELECT_SRC = "ENV/SETTINGS"
EP_DEFAULTS: Dict[str, str] = {
# AUTH
"AUTH_REGISTER_PATH": "/auth/v1/register",
"AUTH_TOKEN_PATH": "/auth/v1/token",
"AUTH_REFRESH_PATH": "/auth/v1/refresh",
"AUTH_ME_PATH": "/auth/v1/me",
# USERS (admin and/or owner)
"USERS_LIST_PATH": "/auth/v1/users",
"USER_DETAIL_PATH": "/auth/v1/users/{user_id}",
# PROFILES
"PROFILE_ME_PATH": "/profiles/v1/profiles/me",
"PROFILE_CREATE_PATH": "/profiles/v1/profiles",
# Optional, если сервер будет поддерживать:
"PROFILE_ME_PATCH_PATH": "", # пусто → нет на бэкенде
"PROFILE_PHOTO_UPLOAD_PATH": "", # "/profiles/v1/profiles/me/photo"
"PROFILE_PHOTO_DELETE_PATH": "", # "/profiles/v1/profiles/me/photo"
}
def EP(key: str, **fmt) -> str:
# при желании можно переопределить путями в settings или ENV: API_<KEY>
override = _from_settings_or_env(f"API_{key}", None)
path = (override or EP_DEFAULTS[key]).format(**fmt)
if not path.startswith("/"):
path = "/" + path
return path
def _get_api_base() -> str:
global _API_BASE_CACHE, _API_LAST_SELECT_SRC
if _API_BASE_CACHE:
return _API_BASE_CACHE
# пробуем взять из ENV/SETTINGS
_API_BASE_CACHE = _API_BASE_PRIMARY
_API_LAST_SELECT_SRC = "ENV/SETTINGS"
if API_DEBUG:
logger.info("API base selected [%s]: %s", _API_LAST_SELECT_SRC, _API_BASE_CACHE)
return _API_BASE_CACHE
def _set_api_base(new_base: str):
global _API_BASE_CACHE, _API_LAST_SELECT_SRC
if new_base and new_base.rstrip("/") != _API_BASE_CACHE:
old = _API_BASE_CACHE
_API_BASE_CACHE = new_base.rstrip("/")
_API_LAST_SELECT_SRC = "FALLBACK"
if API_DEBUG:
logger.warning("API BASE SWITCHED: %s%s", old, _API_BASE_CACHE)
# === Исключение API: ==========================================================
class ApiError(Exception):
def __init__(self, status: int, message: str = "API error", payload: Optional[dict] = None):
super().__init__(message)
self.status = status
self.payload = payload or {}
# === Вспомогательные: =========================================================
def _headers(request, *, extra: Optional[Dict[str, str]] = None, json_mode: bool = True) -> Dict[str, str]:
h = {"Accept": "application/json"}
if json_mode:
h["Content-Type"] = "application/json"
# Токен: из сессии или из cookie (HTTPCookie)
token = request.session.get("access_token") or request.COOKIES.get("access_token")
if token:
h["Authorization"] = f"Bearer {token}"
api_key = getattr(settings, "API_KEY", "") or os.environ.get("API_KEY", "")
if api_key:
h["X-API-Key"] = api_key
if extra:
h.update(extra)
return h
def _url(path: str) -> str:
base = _get_api_base()
path = path if path.startswith("/") else "/" + path
return base + path
def _log_curl(method: str, url: str, headers: Dict[str, str], body: Optional[Union[dict, str]]):
if not API_LOG_CURL:
return
parts = ["curl", "-X", method.upper(), f"'{url}'"]
if headers:
for k, v in headers.items():
if k.lower() == "authorization":
v = "*****"
parts += ["-H", f"'{k}: {v}'"]
if body is not None and body != {}:
import json as _json
b = body if isinstance(body, str) else _json.dumps(body, ensure_ascii=False)
parts += ["-d", f"'{b}'"]
logger.debug("CURL> %s", " ".join(parts))
# === Базовый HTTP вызов с ретраями/refresh/fallback: ==========================
def request_api(
request,
method: str,
path: str,
*,
params: Optional[dict] = None,
json: Optional[dict] = None,
files: Optional[dict] = None,
) -> Tuple[int, Any]:
rid = uuid.uuid4().hex[:8]
url = _url(path)
json_mode = files is None
headers = _headers(request, json_mode=json_mode)
if API_DEBUG:
b_preview = ("***" if json and "password" in (json or {}) else json)
if isinstance(b_preview, dict) and "password" in (b_preview or {}):
b_preview = dict(b_preview); b_preview["password"] = "***"
logger.info("API[req_id=%s] REQUEST %s %s params=%s headers=%s body=%s",
rid, method.upper(), url, params, (headers if API_LOG_HEADERS else "{…}"),
(str(b_preview)[:API_LOG_BODY_MAX] if b_preview else None))
_log_curl(method, url, headers, b_preview)
try:
resp = requests.request(
method=method.upper(),
url=url,
headers=headers,
params=params,
json=json if json_mode else None,
files=files if not json_mode else None,
timeout=float(getattr(settings, "API_TIMEOUT", os.environ.get("API_TIMEOUT", 6.0))),
)
except requests.RequestException as e:
logger.exception("API network error: %s", e)
raise ApiError(0, f"Network unavailable or timeout when accessing API ({e})")
content_type = resp.headers.get("Content-Type", "")
try:
data = resp.json() if "application/json" in content_type else {}
except ValueError:
data = {}
if API_DEBUG:
b_preview = data
logger.info("API[req_id=%s] RESPONSE %s %sms ct=%s headers=%s body=%s",
rid, resp.status_code, int(resp.elapsed.total_seconds()*1000),
content_type, (dict(resp.headers) if API_LOG_HEADERS else "{…}"),
(str(b_preview)[:API_LOG_BODY_MAX] if b_preview else None))
# авто-ретрай на 401: пробуем refresh
if resp.status_code == 401 and (request.session.get("refresh_token") or request.COOKIES.get("refresh_token")):
if API_DEBUG:
logger.info("API[req_id=%s] 401 → try refresh token", rid)
try:
refresh_json = {"refresh_token": request.session.get("refresh_token") or request.COOKIES.get("refresh_token")}
r = requests.post(_url(EP("AUTH_REFRESH_PATH")), json=refresh_json, timeout=float(getattr(settings, "API_TIMEOUT", 6.0)))
if r.status_code == 200:
tokens = r.json()
request.session["access_token"] = tokens.get("access_token")
request.session["refresh_token"] = tokens.get("refresh_token")
request.session.modified = True
if API_DEBUG:
logger.info("API[req_id=%s] REFRESH OK → retry original request", rid)
# повторяем исходный
headers = _headers(request, json_mode=json_mode)
resp = requests.request(
method=method.upper(), url=url, headers=headers,
params=params, json=json if json_mode else None, files=files if not json_mode else None,
timeout=float(getattr(settings, "API_TIMEOUT", 6.0)),
)
content_type = resp.headers.get("Content-Type", "")
data = resp.json() if "application/json" in content_type else {}
else:
if API_DEBUG:
logger.warning("API[req_id=%s] REFRESH FAIL %s body=%s", rid, r.status_code, r.text[:200])
except requests.RequestException as e:
logger.exception("Refresh token error: %s", e)
raise ApiError(401, f"Token refresh failed ({e})")
# fallback базы на 404 (например, локальный gateway vs 8080)
if resp.status_code == 404 and API_FALLBACK_OPENAPI_ON_404:
# пробуем со слешем
if not url.endswith("/"):
url_slash = url + "/"
if API_DEBUG:
logger.info("API[req_id=%s] 404 → retry with trailing slash", rid)
resp2 = requests.request(method=method.upper(), url=url_slash, headers=headers,
params=params, json=json if json_mode else None, files=files if not json_mode else None,
timeout=float(getattr(settings, "API_TIMEOUT", 6.0)))
content_type2 = resp2.headers.get("Content-Type", "")
data2 = resp2.json() if "application/json" in content_type2 else {}
if resp2.status_code != 404:
resp, data, content_type = resp2, data2, content_type2
# если всё ещё 404 и текущая база не 8080 — переключаемся и повторяем
if resp.status_code == 404 and _get_api_base() != "http://localhost:8080":
if API_DEBUG:
logger.warning("API[req_id=%s] 404 on base %s → switch API base to http://localhost:8080 and retry",
rid, _get_api_base())
_set_api_base("http://localhost:8080")
url = _url(path)
headers = _headers(request, json_mode=json_mode)
resp = requests.request(method=method.upper(), url=url, headers=headers,
params=params, json=json if json_mode else None, files=files if not json_mode else None,
timeout=float(getattr(settings, "API_TIMEOUT", 6.0)))
content_type = resp.headers.get("Content-Type", "")
try:
data = resp.json() if "application/json" in content_type else {}
except ValueError:
data = {}
# финальная проверка кода
if not (200 <= resp.status_code < 300):
msg = data.get("detail") or data.get("message") or f"API error: {resp.status_code}"
if resp.status_code in (401, 403):
raise PermissionDenied(msg)
raise ApiError(resp.status_code, msg, data)
return resp.status_code, data
# === High-level helpers в соответствии с OpenAPI v1 ==========================
def register_user(request, email: str, password: str, full_name: Optional[str] = None, role: str = "CLIENT") -> Dict[str, Any]:
body = {"email": email, "password": password, "full_name": full_name, "role": role}
_, data = request_api(request, "POST", EP("AUTH_REGISTER_PATH"), json=body)
return data # UserRead
def login(request, email: str, password: str) -> Dict[str, Any]:
body = {"email": email, "password": password}
_, data = request_api(request, "POST", EP("AUTH_TOKEN_PATH"), json=body)
return data # TokenPair
def get_me(request) -> Dict[str, Any]:
_, data = request_api(request, "GET", EP("AUTH_ME_PATH"))
return data # UserRead
def list_users(request, offset: int = 0, limit: int = 50) -> List[Dict[str, Any]] | Dict[str, Any]:
params = {"offset": offset, "limit": min(max(limit, 1), 200)}
_, data = request_api(request, "GET", EP("USERS_LIST_PATH"), params=params)
return data # array[UserRead] (в схеме — массив) или {"items": [...]} если backend так возвращает
def get_user(request, user_id: str) -> Dict[str, Any]:
_, data = request_api(request, "GET", EP("USER_DETAIL_PATH", user_id=user_id))
return data # UserRead
def update_user_me(request, user_id: str, *, full_name: Optional[str] = None, password: Optional[str] = None) -> Dict[str, Any]:
body: Dict[str, Any] = {}
if full_name is not None:
body["full_name"] = full_name
if password is not None:
body["password"] = password
_, data = request_api(request, "PATCH", EP("USER_DETAIL_PATH", user_id=user_id), json=body)
return data # UserRead
def get_my_profile(request) -> Dict[str, Any]:
_, data = request_api(request, "GET", EP("PROFILE_ME_PATH"))
return data # ProfileOut
def create_my_profile(request, gender: str, city: str, languages: List[str], interests: List[str]) -> Dict[str, Any]:
body = {"gender": gender, "city": city, "languages": languages, "interests": interests}
_, data = request_api(request, "POST", EP("PROFILE_CREATE_PATH"), json=body)
return data # ProfileOut
# Опционально: если сервер добавит PATCH /profiles/v1/profiles/me
def patch_my_profile(request, **fields) -> Dict[str, Any]:
patch_path = EP("PROFILE_ME_PATCH_PATH")
if not patch_path or patch_path.strip("/") == "":
raise ApiError(405, "Profile update endpoint is not available on backend")
_, data = request_api(request, "PATCH", patch_path, json=fields)
return data
def upload_my_profile_photo(request, file_obj: IO[bytes]) -> Dict[str, Any]:
"""
POST /profiles/v1/profiles/me/photo (multipart/form-data, field: file)
Возвращает ProfileOut с photo_url. (см. OpenAPI) :contentReference[oaicite:1]{index=1}
"""
filename = getattr(file_obj, 'name', 'photo.jpg')
content_type = getattr(file_obj, 'content_type', 'application/octet-stream')
files = {'file': (filename, file_obj, content_type)}
_, data = request_api(request, 'POST', '/profiles/v1/profiles/me/photo', files=files)
return data
def delete_my_photo(request) -> Dict[str, Any]:
path = EP("PROFILE_PHOTO_DELETE_PATH")
if not path or path.strip("/") == "":
raise ApiError(405, "Photo delete endpoint is not available on backend")
_, data = request_api(request, "DELETE", path)
return data