import logging import os import os.path import json import time import uuid from typing import Any, Dict, Optional, Tuple, List, Union import requests from django.conf import settings from django.core.exceptions import PermissionDenied logger = logging.getLogger(__name__) # ===== Логирование / флаги ===== API_DEBUG = os.environ.get('API_DEBUG', '1') == '1' API_LOG_BODY_MAX = int(os.environ.get('API_LOG_BODY_MAX', '2000')) API_LOG_HEADERS = os.environ.get('API_LOG_HEADERS', '1') == '1' API_LOG_CURL = os.environ.get('API_LOG_CURL', '0') == '1' # Переключение базы при 404: сначала servers[0].url из openapi.json, потом жёстко http://localhost:8080 API_FALLBACK_OPENAPI_ON_404 = os.environ.get('API_FALLBACK_OPENAPI_ON_404', '1') == '1' SENSITIVE_KEYS = {'password', 'refresh_token', 'access_token', 'authorization', 'token', 'api_key'} def _sanitize(obj: Any) -> Any: try: if isinstance(obj, dict): return {k: ('***' if isinstance(k, str) and k.lower() in SENSITIVE_KEYS else _sanitize(v)) for k, v in obj.items()} if isinstance(obj, list): return [_sanitize(x) for x in obj] return obj except Exception: return obj def _shorten(s: str, limit: int) -> str: if s is None: return '' return s if len(s) <= limit else (s[:limit] + f'... ') def _build_curl(method: str, url: str, headers: Dict[str, str], params: Optional[dict], json_body: Optional[dict], data: Optional[dict]) -> str: parts = [f"curl -X '{method.upper()}' \\\n '{url}'"] for k, v in headers.items(): if k.lower() == 'authorization': v = 'Bearer ***' parts.append(f" -H '{k}: {v}'") if json_body is not None: parts.append(" -H 'Content-Type: application/json'") try: body_str = json.dumps(_sanitize(json_body), ensure_ascii=False) except Exception: body_str = str(_sanitize(json_body)) parts.append(f" -d '{body_str}'") elif data is not None: try: body_str = json.dumps(_sanitize(data), ensure_ascii=False) except Exception: body_str = str(_sanitize(data)) parts.append(f" --data-raw '{body_str}'") return " \\\n".join(parts) # ===== Пути эндпоинтов (как в swagger) ===== EP_DEFAULTS: Dict[str, str] = { # Auth 'AUTH_REGISTER_PATH': '/auth/v1/register', 'AUTH_TOKEN_PATH': '/auth/v1/token', 'AUTH_REFRESH_PATH': '/auth/v1/refresh', 'ME_PATH': '/auth/v1/me', 'USERS_LIST_PATH': '/auth/v1/users', 'USER_DETAIL_PATH': '/auth/v1/users/{user_id}', # Profiles 'PROFILE_ME_PATH': '/profiles/v1/profiles/me', 'PROFILES_CREATE_PATH': '/profiles/v1/profiles', 'PROFILE_PHOTO_UPLOAD_PATH': '/profiles/v1/profiles/me/photo', 'PROFILE_PHOTO_DELETE_PATH': '/profiles/v1/profiles/me/photo', # Pairs 'PAIRS_PATH': '/match/v1/pairs', 'PAIR_DETAIL_PATH': '/match/v1/pairs/{pair_id}', 'PAIR_ACCEPT_PATH': '/match/v1/pairs/{pair_id}/accept', 'PAIR_REJECT_PATH': '/match/v1/pairs/{pair_id}/reject', # Chat 'ROOMS_PATH': '/chat/v1/rooms', 'ROOM_DETAIL_PATH': '/chat/v1/rooms/{room_id}', 'ROOM_MESSAGES_PATH': '/chat/v1/rooms/{room_id}/messages', # Payments 'INVOICES_PATH': '/payments/v1/invoices', 'INVOICE_DETAIL_PATH': '/payments/v1/invoices/{inv_id}', 'INVOICE_MARK_PAID_PATH': '/payments/v1/invoices/{inv_id}/mark-paid', } def EP(key: str) -> str: return os.environ.get(f'API_{key}', EP_DEFAULTS[key]) # ===== База API с авто‑детектом ===== _API_BASE_CACHE: Optional[str] = None _API_LAST_SELECT_SRC = 'DEFAULT' # для логов def _detect_api_base_from_openapi() -> Optional[str]: """ Берём servers[0].url из openapi.json (по схеме — http://localhost:8080). """ candidates = [ os.environ.get('API_SPEC_PATH'), getattr(settings, 'API_SPEC_PATH', None), os.path.join(getattr(settings, 'BASE_DIR', ''), 'openapi.json') if getattr(settings, 'BASE_DIR', None) else None, os.path.join(getattr(settings, 'BASE_DIR', ''), 'agency', 'openapi.json') if getattr(settings, 'BASE_DIR', None) else None, '/mnt/data/openapi.json', ] for p in candidates: if p and os.path.isfile(p): try: with open(p, 'r', encoding='utf-8') as f: spec = json.load(f) servers = spec.get('servers') or [] if servers and isinstance(servers[0], dict): url = servers[0].get('url') if url: return url except Exception as e: if API_DEBUG: logger.debug('API: cannot read OpenAPI from %s: %s', p, e) return None def _get_api_base_url() -> str: """ Источники (по приоритету): 1) ENV/Settings: API_BASE_URL, затем BASE_URL 2) servers[0].url из openapi.json 3) 'http://localhost:8080' """ global _API_BASE_CACHE, _API_LAST_SELECT_SRC if _API_BASE_CACHE: return _API_BASE_CACHE base = (os.environ.get('API_BASE_URL') or getattr(settings, 'API_BASE_URL', '') or os.environ.get('BASE_URL') or getattr(settings, 'BASE_URL', '')) if base: _API_BASE_CACHE = base.rstrip('/') _API_LAST_SELECT_SRC = 'ENV/SETTINGS' else: detected = _detect_api_base_from_openapi() if detected: _API_BASE_CACHE = detected.rstrip('/') _API_LAST_SELECT_SRC = 'OPENAPI' else: _API_BASE_CACHE = 'http://localhost:8080' _API_LAST_SELECT_SRC = 'HARDCODED' if API_DEBUG: logger.info("API base selected [%s]: %s", _API_LAST_SELECT_SRC, _API_BASE_CACHE) return _API_BASE_CACHE class ApiError(Exception): def __init__(self, status: int, message: str = 'API error', payload: Optional[dict] = None, req_id: Optional[str] = None): if req_id and message and 'req_id=' not in message: message = f"{message} (req_id={req_id})" super().__init__(message) self.status = status self.payload = payload or {} self.req_id = req_id def _base_headers(request, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]: """ Достаём токен сначала из сессии, затем из куки — чтобы «куки‑режим» работал без доп. настроек. """ headers: Dict[str, str] = {'Accept': 'application/json'} token = request.session.get('access_token') or request.COOKIES.get('access_token') if token: headers['Authorization'] = f'Bearer {token}' api_key = getattr(settings, 'API_KEY', '') or os.environ.get('API_KEY', '') if api_key: headers['X-API-Key'] = api_key if extra: headers.update(extra) return headers def _url(path: str) -> str: base = _get_api_base_url() path = path if path.startswith('/') else '/' + path return base + path def request_api( request, method: str, path: str, *, params: Optional[dict] = None, json: Optional[dict] = None, files: Optional[dict] = None, data: Optional[dict] = None, ) -> Tuple[int, Any]: """ Универсальный HTTP-вызов (упрощённая версия): - токен/ключ в заголовках (токен берём из сессии или куки), - auto-refresh при 401 (refresh берём тоже из сессии или куки), - 404 → переключаем базу: servers[0].url из openapi.json → 'http://localhost:8080', - подробные логи; БЕЗ «retry со слэшем». """ global _API_BASE_CACHE, _API_LAST_SELECT_SRC req_id = uuid.uuid4().hex[:8] base_before = _get_api_base_url() url = _url(path) def _do(_url: str): headers = _base_headers(request) if json is not None and files is None and data is None: headers['Content-Type'] = 'application/json' if API_DEBUG: log_headers = _sanitize(headers) if API_LOG_HEADERS else {} log_body = _sanitize(json if json is not None else data) if API_LOG_CURL: try: curl = _build_curl(method, _url, headers, params, json, data) logger.debug("API[req_id=%s] cURL:\n%s", req_id, curl) except Exception: pass logger.info( "API[req_id=%s] REQUEST %s %s params=%s headers=%s body=%s", req_id, method.upper(), _url, _sanitize(params), log_headers, log_body ) t0 = time.time() resp = requests.request( method=method.upper(), url=_url, headers=headers, params=params, json=json, data=data, files=files, timeout=float(getattr(settings, 'API_TIMEOUT', 8.0)), ) dt = int((time.time() - t0) * 1000) content_type = resp.headers.get('Content-Type', '') try: payload = resp.json() if 'application/json' in content_type else {} except ValueError: payload = {} if API_DEBUG: body_str = "" try: body_str = json.dumps(_sanitize(payload), ensure_ascii=False) except Exception: body_str = str(_sanitize(payload)) headers_out = _sanitize(dict(resp.headers)) if API_LOG_HEADERS else {} logger.info( "API[req_id=%s] RESPONSE %s %sms ct=%s headers=%s body=%s", req_id, resp.status_code, dt, content_type, headers_out, _shorten(body_str, API_LOG_BODY_MAX) ) return resp, payload # 1) Первый запрос try: resp, payload = _do(url) except requests.RequestException as e: logger.exception('API[req_id=%s] network error: %s', req_id, e) raise ApiError(0, f'Network unavailable or timeout when accessing API ({e})', req_id=req_id) # 2) 404 → переключаем базу (openapi → 8080) и повторяем if resp.status_code == 404: candidates: List[tuple[str, str]] = [] if API_FALLBACK_OPENAPI_ON_404: detected = _detect_api_base_from_openapi() if detected: candidates.append((detected.rstrip('/'), 'OPENAPI(FAILOVER)')) candidates.append(('http://localhost:8080', 'DEFAULT(FAILOVER)')) for cand_base, label in candidates: if not cand_base or cand_base == _API_BASE_CACHE: continue if API_DEBUG: logger.warning("API[req_id=%s] 404 on base %s → switch API base to %s and retry", req_id, _API_BASE_CACHE, cand_base) _API_BASE_CACHE = cand_base _API_LAST_SELECT_SRC = label try: resp, payload = _do(_url(path)) if resp.status_code != 404: break except requests.RequestException: continue # 3) 401 → refresh и повтор refresh_token = request.session.get('refresh_token') or request.COOKIES.get('refresh_token') if resp.status_code == 401 and refresh_token: if API_DEBUG: logger.info("API[req_id=%s] 401 → try refresh token", req_id) try: refresh_url = _url(EP('AUTH_REFRESH_PATH')) refresh_body = {'refresh_token': refresh_token} logger.info("API[req_id=%s] REFRESH POST %s body=%s", req_id, refresh_url, _sanitize(refresh_body)) refresh_resp = requests.post(refresh_url, json=refresh_body, timeout=float(getattr(settings, 'API_TIMEOUT', 8.0))) if refresh_resp.status_code == 200: try: rj = refresh_resp.json() except ValueError: rj = {} if rj.get('access_token'): request.session['access_token'] = rj['access_token'] if rj.get('refresh_token'): request.session['refresh_token'] = rj['refresh_token'] request.session.modified = True if API_DEBUG: logger.info("API[req_id=%s] REFRESH OK → retry original request", req_id) resp, payload = _do(_url(path)) else: logger.warning("API[req_id=%s] REFRESH failed: %s", req_id, refresh_resp.status_code) except requests.RequestException as e: logger.exception('API[req_id=%s] Refresh token network error: %s', req_id, e) raise ApiError(401, f'Token refresh failed ({e})', req_id=req_id) # 4) Ошибки if not (200 <= resp.status_code < 300): msg = None if isinstance(payload, dict): msg = payload.get('detail') or payload.get('message') msg = msg or f'API error: {resp.status_code}' if resp.status_code in (401, 403): # PermissionDenied обрабатываем во view (не всегда это «выйти и войти заново») raise PermissionDenied(f"{msg} (req_id={req_id})") raise ApiError(resp.status_code, msg, payload if isinstance(payload, dict) else {}, req_id=req_id) # 5) База сменилась — отметим base_after = _get_api_base_url() if API_DEBUG and base_before != base_after: logger.warning("API[req_id=%s] BASE SWITCHED: %s → %s", req_id, base_before, base_after) return resp.status_code, payload # ========================== # AUTH # ========================== def register_user(request, email: str, password: str, full_name: Optional[str] = None, role: str = 'CLIENT') -> Dict[str, Any]: body = {'email': email, 'password': password, 'full_name': full_name, 'role': role} _, data = request_api(request, 'POST', EP('AUTH_REGISTER_PATH'), json=body) return data # UserRead def login(request, email: str, password: str) -> Dict[str, Any]: body = {'email': email, 'password': password} _, data = request_api(request, 'POST', EP('AUTH_TOKEN_PATH'), json=body) return data # TokenPair def get_current_user(request) -> Dict[str, Any]: _, data = request_api(request, 'GET', EP('ME_PATH')) return data # UserRead def list_users(request, offset: int = 0, limit: int = 50) -> Union[List[Dict[str, Any]], Dict[str, Any]]: params = {'offset': offset, 'limit': limit} _, data = request_api(request, 'GET', EP('USERS_LIST_PATH'), params=params) return data def get_user(request, user_id: str) -> Dict[str, Any]: path = EP('USER_DETAIL_PATH').format(user_id=user_id) _, data = request_api(request, 'GET', path) return data def update_user(request, user_id: str, **fields) -> Dict[str, Any]: path = EP('USER_DETAIL_PATH').format(user_id=user_id) _, data = request_api(request, 'PATCH', path, json=fields) return data def delete_user(request, user_id: str) -> None: path = EP('USER_DETAIL_PATH').format(user_id=user_id) request_api(request, 'DELETE', path) # ========================== # PROFILES # ========================== def get_my_profile(request) -> Dict[str, Any]: _, data = request_api(request, 'GET', EP('PROFILE_ME_PATH')) return data # ProfileOut def create_my_profile(request, gender: str, city: str, languages: List[str], interests: List[str]) -> Dict[str, Any]: body = {'gender': gender, 'city': city, 'languages': languages, 'interests': interests} _, data = request_api(request, 'POST', EP('PROFILES_CREATE_PATH'), json=body) return data # ProfileOut # ========================== # PAIRS # ========================== def create_pair(request, user_id_a: str, user_id_b: str, score: Optional[float] = None, notes: Optional[str] = None) -> Dict[str, Any]: body = {'user_id_a': user_id_a, 'user_id_b': user_id_b, 'score': score, 'notes': notes} _, data = request_api(request, 'POST', EP('PAIRS_PATH'), json=body) return data # PairRead def list_pairs(request, for_user_id: Optional[str] = None, status: Optional[str] = None, offset: int = 0, limit: int = 50) -> Union[List[Dict[str, Any]], Dict[str, Any]]: params = {'for_user_id': for_user_id, 'status': status, 'offset': offset, 'limit': limit} params = {k: v for k, v in params.items() if v is not None} _, data = request_api(request, 'GET', EP('PAIRS_PATH'), params=params) return data def get_pair(request, pair_id: str) -> Dict[str, Any]: path = EP('PAIR_DETAIL_PATH').format(pair_id=pair_id) _, data = request_api(request, 'GET', path) return data def update_pair(request, pair_id: str, **fields) -> Dict[str, Any]: path = EP('PAIR_DETAIL_PATH').format(pair_id=pair_id) _, data = request_api(request, 'PATCH', path, json=fields) return data def delete_pair(request, pair_id: str) -> None: path = EP('PAIR_DETAIL_PATH').format(pair_id=pair_id) request_api(request, 'DELETE', path) def accept_pair(request, pair_id: str) -> Dict[str, Any]: path = EP('PAIR_ACCEPT_PATH').format(pair_id=pair_id) _, data = request_api(request, 'POST', path) return data def reject_pair(request, pair_id: str) -> Dict[str, Any]: path = EP('PAIR_REJECT_PATH').format(pair_id=pair_id) _, data = request_api(request, 'POST', path) return data # ========================== # CHAT # ========================== def my_rooms(request) -> Union[List[Dict[str, Any]], Dict[str, Any]]: _, data = request_api(request, 'GET', EP('ROOMS_PATH')) return data def get_room(request, room_id: str) -> Dict[str, Any]: path = EP('ROOM_DETAIL_PATH').format(room_id=room_id) _, data = request_api(request, 'GET', path) return data def create_room(request, title: Optional[str] = None, participants: List[str] = []) -> Dict[str, Any]: body = {'title': title, 'participants': participants} _, data = request_api(request, 'POST', EP('ROOMS_PATH'), json=body) return data def send_message(request, room_id: str, content: str) -> Dict[str, Any]: body = {'content': content} path = EP('ROOM_MESSAGES_PATH').format(room_id=room_id) _, data = request_api(request, 'POST', path, json=body) return data def list_messages(request, room_id: str, offset: int = 0, limit: int = 100) -> Union[List[Dict[str, Any]], Dict[str, Any]]: params = {'offset': offset, 'limit': limit} path = EP('ROOM_MESSAGES_PATH').format(room_id=room_id) _, data = request_api(request, 'GET', path, params=params) return data # ========================== # PAYMENTS # ========================== def create_invoice(request, client_id: str, amount: float, currency: str, description: Optional[str] = None) -> Dict[str, Any]: body = {'client_id': client_id, 'amount': amount, 'currency': currency, 'description': description} _, data = request_api(request, 'POST', EP('INVOICES_PATH'), json=body) return data def list_invoices(request, client_id: Optional[str] = None, status: Optional[str] = None, offset: int = 0, limit: int = 50) -> Union[List[Dict[str, Any]], Dict[str, Any]]: params = {'client_id': client_id, 'status': status, 'offset': offset, 'limit': limit} params = {k: v for k, v in params.items() if v is not None} _, data = request_api(request, 'GET', EP('INVOICES_PATH'), params=params) return data def get_invoice(request, inv_id: str) -> Dict[str, Any]: path = EP('INVOICE_DETAIL_PATH').format(inv_id=inv_id) _, data = request_api(request, 'GET', path) return data def update_invoice(request, inv_id: str, **fields) -> Dict[str, Any]: path = EP('INVOICE_DETAIL_PATH').format(inv_id=inv_id) _, data = request_api(request, 'PATCH', path, json=fields) return data def delete_invoice(request, inv_id: str) -> None: path = EP('INVOICE_DETAIL_PATH').format(inv_id=inv_id) request_api(request, 'DELETE', path) def mark_invoice_paid(request, inv_id: str) -> Dict[str, Any]: path = EP('INVOICE_MARK_PAID_PATH').format(inv_id=inv_id) _, data = request_api(request, 'POST', path) return data def upload_my_photo(request, file_obj) -> Dict[str, Any]: """ Отправляет multipart/form-data на бекенд для загрузки фото профиля. Ожидаем, что сервер примет поле 'file' и вернёт обновлённый профиль или {photo_url: "..."}. """ path = EP('PROFILE_PHOTO_UPLOAD_PATH') filename = getattr(file_obj, 'name', 'photo.jpg') content_type = getattr(file_obj, 'content_type', 'application/octet-stream') files = {'file': (filename, file_obj, content_type)} _, data = request_api(request, 'POST', path, files=files) return data def delete_my_photo(request) -> Dict[str, Any]: """ Удаляет фото профиля (если сервер поддерживает DELETE на том же пути). """ path = EP('PROFILE_PHOTO_DELETE_PATH') _, data = request_api(request, 'DELETE', path) return data