import logging import os import uuid from typing import Any, Dict, Optional, Tuple, List, Union, IO import requests from django.conf import settings from django.core.exceptions import PermissionDenied logger = logging.getLogger(__name__) # === Конфиг лога (можно подкрутить через .env): =============================== API_DEBUG = bool(int(os.environ.get("API_DEBUG", getattr(settings, "API_DEBUG", 1)))) API_LOG_BODY_MAX = int(os.environ.get("API_LOG_BODY_MAX", getattr(settings, "API_LOG_BODY_MAX", 2000))) API_LOG_HEADERS = bool(int(os.environ.get("API_LOG_HEADERS", getattr(settings, "API_LOG_HEADERS", 1)))) API_LOG_CURL = bool(int(os.environ.get("API_LOG_CURL", getattr(settings, "API_LOG_CURL", 1)))) API_FALLBACK_OPENAPI_ON_404 = bool(int(os.environ.get("API_FALLBACK_OPENAPI_ON_404", getattr(settings, "API_FALLBACK_OPENAPI_ON_404", 1)))) # === База API и пути по умолчанию (OpenAPI v1): =============================== def _from_settings_or_env(name: str, default: Optional[str] = None) -> Optional[str]: return getattr(settings, name, None) or os.environ.get(name, None) or default _API_BASE_PRIMARY = (_from_settings_or_env("API_BASE_URL", None) or "http://localhost:8080").rstrip("/") _API_BASE_CACHE = None # текущая выбранная база _API_LAST_SELECT_SRC = "ENV/SETTINGS" EP_DEFAULTS: Dict[str, str] = { # AUTH "AUTH_REGISTER_PATH": "/auth/v1/register", "AUTH_TOKEN_PATH": "/auth/v1/token", "AUTH_REFRESH_PATH": "/auth/v1/refresh", "AUTH_ME_PATH": "/auth/v1/me", # USERS (admin and/or owner) "USERS_LIST_PATH": "/auth/v1/users", "USER_DETAIL_PATH": "/auth/v1/users/{user_id}", # PROFILES "PROFILE_ME_PATH": "/profiles/v1/profiles/me", "PROFILE_CREATE_PATH": "/profiles/v1/profiles", # Optional, если сервер будет поддерживать: "PROFILE_ME_PATCH_PATH": "", # пусто → нет на бэкенде "PROFILE_PHOTO_UPLOAD_PATH": "", # "/profiles/v1/profiles/me/photo" "PROFILE_PHOTO_DELETE_PATH": "", # "/profiles/v1/profiles/me/photo" } def EP(key: str, **fmt) -> str: # при желании можно переопределить путями в settings или ENV: API_ override = _from_settings_or_env(f"API_{key}", None) path = (override or EP_DEFAULTS[key]).format(**fmt) if not path.startswith("/"): path = "/" + path return path def _get_api_base() -> str: global _API_BASE_CACHE, _API_LAST_SELECT_SRC if _API_BASE_CACHE: return _API_BASE_CACHE # пробуем взять из ENV/SETTINGS _API_BASE_CACHE = _API_BASE_PRIMARY _API_LAST_SELECT_SRC = "ENV/SETTINGS" if API_DEBUG: logger.info("API base selected [%s]: %s", _API_LAST_SELECT_SRC, _API_BASE_CACHE) return _API_BASE_CACHE def _set_api_base(new_base: str): global _API_BASE_CACHE, _API_LAST_SELECT_SRC if new_base and new_base.rstrip("/") != _API_BASE_CACHE: old = _API_BASE_CACHE _API_BASE_CACHE = new_base.rstrip("/") _API_LAST_SELECT_SRC = "FALLBACK" if API_DEBUG: logger.warning("API BASE SWITCHED: %s → %s", old, _API_BASE_CACHE) # === Исключение API: ========================================================== class ApiError(Exception): def __init__(self, status: int, message: str = "API error", payload: Optional[dict] = None): super().__init__(message) self.status = status self.payload = payload or {} # === Вспомогательные: ========================================================= def _headers(request, *, extra: Optional[Dict[str, str]] = None, json_mode: bool = True) -> Dict[str, str]: h = {"Accept": "application/json"} if json_mode: h["Content-Type"] = "application/json" # Токен: из сессии или из cookie (HTTPCookie) token = request.session.get("access_token") or request.COOKIES.get("access_token") if token: h["Authorization"] = f"Bearer {token}" api_key = getattr(settings, "API_KEY", "") or os.environ.get("API_KEY", "") if api_key: h["X-API-Key"] = api_key if extra: h.update(extra) return h def _url(path: str) -> str: base = _get_api_base() path = path if path.startswith("/") else "/" + path return base + path def _log_curl(method: str, url: str, headers: Dict[str, str], body: Optional[Union[dict, str]]): if not API_LOG_CURL: return parts = ["curl", "-X", method.upper(), f"'{url}'"] if headers: for k, v in headers.items(): if k.lower() == "authorization": v = "*****" parts += ["-H", f"'{k}: {v}'"] if body is not None and body != {}: import json as _json b = body if isinstance(body, str) else _json.dumps(body, ensure_ascii=False) parts += ["-d", f"'{b}'"] logger.debug("CURL> %s", " ".join(parts)) # === Базовый HTTP вызов с ретраями/refresh/fallback: ========================== def request_api( request, method: str, path: str, *, params: Optional[dict] = None, json: Optional[dict] = None, files: Optional[dict] = None, ) -> Tuple[int, Any]: rid = uuid.uuid4().hex[:8] url = _url(path) json_mode = files is None headers = _headers(request, json_mode=json_mode) if API_DEBUG: b_preview = ("***" if json and "password" in (json or {}) else json) if isinstance(b_preview, dict) and "password" in (b_preview or {}): b_preview = dict(b_preview); b_preview["password"] = "***" logger.info("API[req_id=%s] REQUEST %s %s params=%s headers=%s body=%s", rid, method.upper(), url, params, (headers if API_LOG_HEADERS else "{…}"), (str(b_preview)[:API_LOG_BODY_MAX] if b_preview else None)) _log_curl(method, url, headers, b_preview) try: resp = requests.request( method=method.upper(), url=url, headers=headers, params=params, json=json if json_mode else None, files=files if not json_mode else None, timeout=float(getattr(settings, "API_TIMEOUT", os.environ.get("API_TIMEOUT", 6.0))), ) except requests.RequestException as e: logger.exception("API network error: %s", e) raise ApiError(0, f"Network unavailable or timeout when accessing API ({e})") content_type = resp.headers.get("Content-Type", "") try: data = resp.json() if "application/json" in content_type else {} except ValueError: data = {} if API_DEBUG: b_preview = data logger.info("API[req_id=%s] RESPONSE %s %sms ct=%s headers=%s body=%s", rid, resp.status_code, int(resp.elapsed.total_seconds()*1000), content_type, (dict(resp.headers) if API_LOG_HEADERS else "{…}"), (str(b_preview)[:API_LOG_BODY_MAX] if b_preview else None)) # авто-ретрай на 401: пробуем refresh if resp.status_code == 401 and (request.session.get("refresh_token") or request.COOKIES.get("refresh_token")): if API_DEBUG: logger.info("API[req_id=%s] 401 → try refresh token", rid) try: refresh_json = {"refresh_token": request.session.get("refresh_token") or request.COOKIES.get("refresh_token")} r = requests.post(_url(EP("AUTH_REFRESH_PATH")), json=refresh_json, timeout=float(getattr(settings, "API_TIMEOUT", 6.0))) if r.status_code == 200: tokens = r.json() request.session["access_token"] = tokens.get("access_token") request.session["refresh_token"] = tokens.get("refresh_token") request.session.modified = True if API_DEBUG: logger.info("API[req_id=%s] REFRESH OK → retry original request", rid) # повторяем исходный headers = _headers(request, json_mode=json_mode) resp = requests.request( method=method.upper(), url=url, headers=headers, params=params, json=json if json_mode else None, files=files if not json_mode else None, timeout=float(getattr(settings, "API_TIMEOUT", 6.0)), ) content_type = resp.headers.get("Content-Type", "") data = resp.json() if "application/json" in content_type else {} else: if API_DEBUG: logger.warning("API[req_id=%s] REFRESH FAIL %s body=%s", rid, r.status_code, r.text[:200]) except requests.RequestException as e: logger.exception("Refresh token error: %s", e) raise ApiError(401, f"Token refresh failed ({e})") # fallback базы на 404 (например, локальный gateway vs 8080) if resp.status_code == 404 and API_FALLBACK_OPENAPI_ON_404: # пробуем со слешем if not url.endswith("/"): url_slash = url + "/" if API_DEBUG: logger.info("API[req_id=%s] 404 → retry with trailing slash", rid) resp2 = requests.request(method=method.upper(), url=url_slash, headers=headers, params=params, json=json if json_mode else None, files=files if not json_mode else None, timeout=float(getattr(settings, "API_TIMEOUT", 6.0))) content_type2 = resp2.headers.get("Content-Type", "") data2 = resp2.json() if "application/json" in content_type2 else {} if resp2.status_code != 404: resp, data, content_type = resp2, data2, content_type2 # если всё ещё 404 и текущая база не 8080 — переключаемся и повторяем if resp.status_code == 404 and _get_api_base() != "http://localhost:8080": if API_DEBUG: logger.warning("API[req_id=%s] 404 on base %s → switch API base to http://localhost:8080 and retry", rid, _get_api_base()) _set_api_base("http://localhost:8080") url = _url(path) headers = _headers(request, json_mode=json_mode) resp = requests.request(method=method.upper(), url=url, headers=headers, params=params, json=json if json_mode else None, files=files if not json_mode else None, timeout=float(getattr(settings, "API_TIMEOUT", 6.0))) content_type = resp.headers.get("Content-Type", "") try: data = resp.json() if "application/json" in content_type else {} except ValueError: data = {} # финальная проверка кода if not (200 <= resp.status_code < 300): msg = data.get("detail") or data.get("message") or f"API error: {resp.status_code}" if resp.status_code in (401, 403): raise PermissionDenied(msg) raise ApiError(resp.status_code, msg, data) return resp.status_code, data # === High-level helpers в соответствии с OpenAPI v1 ========================== def register_user(request, email: str, password: str, full_name: Optional[str] = None, role: str = "CLIENT") -> Dict[str, Any]: body = {"email": email, "password": password, "full_name": full_name, "role": role} _, data = request_api(request, "POST", EP("AUTH_REGISTER_PATH"), json=body) return data # UserRead def login(request, email: str, password: str) -> Dict[str, Any]: body = {"email": email, "password": password} _, data = request_api(request, "POST", EP("AUTH_TOKEN_PATH"), json=body) return data # TokenPair def get_me(request) -> Dict[str, Any]: _, data = request_api(request, "GET", EP("AUTH_ME_PATH")) return data # UserRead def list_users(request, offset: int = 0, limit: int = 50) -> List[Dict[str, Any]] | Dict[str, Any]: params = {"offset": offset, "limit": min(max(limit, 1), 200)} _, data = request_api(request, "GET", EP("USERS_LIST_PATH"), params=params) return data # array[UserRead] (в схеме — массив) или {"items": [...]} если backend так возвращает def get_user(request, user_id: str) -> Dict[str, Any]: _, data = request_api(request, "GET", EP("USER_DETAIL_PATH", user_id=user_id)) return data # UserRead def update_user_me(request, user_id: str, *, full_name: Optional[str] = None, password: Optional[str] = None) -> Dict[str, Any]: body: Dict[str, Any] = {} if full_name is not None: body["full_name"] = full_name if password is not None: body["password"] = password _, data = request_api(request, "PATCH", EP("USER_DETAIL_PATH", user_id=user_id), json=body) return data # UserRead def get_my_profile(request) -> Dict[str, Any]: _, data = request_api(request, "GET", EP("PROFILE_ME_PATH")) return data # ProfileOut def create_my_profile(request, gender: str, city: str, languages: List[str], interests: List[str]) -> Dict[str, Any]: body = {"gender": gender, "city": city, "languages": languages, "interests": interests} _, data = request_api(request, "POST", EP("PROFILE_CREATE_PATH"), json=body) return data # ProfileOut # Опционально: если сервер добавит PATCH /profiles/v1/profiles/me def patch_my_profile(request, **fields) -> Dict[str, Any]: patch_path = EP("PROFILE_ME_PATCH_PATH") if not patch_path or patch_path.strip("/") == "": raise ApiError(405, "Profile update endpoint is not available on backend") _, data = request_api(request, "PATCH", patch_path, json=fields) return data def upload_my_profile_photo(request, file_obj: IO[bytes]) -> Dict[str, Any]: """ POST /profiles/v1/profiles/me/photo (multipart/form-data, field: file) Возвращает ProfileOut с photo_url. (см. OpenAPI) :contentReference[oaicite:1]{index=1} """ filename = getattr(file_obj, 'name', 'photo.jpg') content_type = getattr(file_obj, 'content_type', 'application/octet-stream') files = {'file': (filename, file_obj, content_type)} _, data = request_api(request, 'POST', '/profiles/v1/profiles/me/photo', files=files) return data def delete_my_photo(request) -> Dict[str, Any]: path = EP("PROFILE_PHOTO_DELETE_PATH") if not path or path.strip("/") == "": raise ApiError(405, "Photo delete endpoint is not available on backend") _, data = request_api(request, "DELETE", path) return data