527 lines
21 KiB
Python
527 lines
21 KiB
Python
import logging
|
||
import os
|
||
import os.path
|
||
import json
|
||
import time
|
||
import uuid
|
||
from typing import Any, Dict, Optional, Tuple, List, Union
|
||
|
||
import requests
|
||
from django.conf import settings
|
||
from django.core.exceptions import PermissionDenied
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ===== Логирование / флаги =====
|
||
API_DEBUG = os.environ.get('API_DEBUG', '1') == '1'
|
||
API_LOG_BODY_MAX = int(os.environ.get('API_LOG_BODY_MAX', '2000'))
|
||
API_LOG_HEADERS = os.environ.get('API_LOG_HEADERS', '1') == '1'
|
||
API_LOG_CURL = os.environ.get('API_LOG_CURL', '0') == '1'
|
||
|
||
# Переключение базы при 404: сначала servers[0].url из openapi.json, потом жёстко http://localhost:8080
|
||
API_FALLBACK_OPENAPI_ON_404 = os.environ.get('API_FALLBACK_OPENAPI_ON_404', '1') == '1'
|
||
|
||
SENSITIVE_KEYS = {'password', 'refresh_token', 'access_token', 'authorization', 'token', 'api_key'}
|
||
|
||
|
||
def _sanitize(obj: Any) -> Any:
|
||
try:
|
||
if isinstance(obj, dict):
|
||
return {k: ('***' if isinstance(k, str) and k.lower() in SENSITIVE_KEYS else _sanitize(v))
|
||
for k, v in obj.items()}
|
||
if isinstance(obj, list):
|
||
return [_sanitize(x) for x in obj]
|
||
return obj
|
||
except Exception:
|
||
return obj
|
||
|
||
|
||
def _shorten(s: str, limit: int) -> str:
|
||
if s is None:
|
||
return ''
|
||
return s if len(s) <= limit else (s[:limit] + f'... <truncated {len(s)-limit} chars>')
|
||
|
||
|
||
def _build_curl(method: str, url: str, headers: Dict[str, str],
|
||
params: Optional[dict], json_body: Optional[dict], data: Optional[dict]) -> str:
|
||
parts = [f"curl -X '{method.upper()}' \\\n '{url}'"]
|
||
for k, v in headers.items():
|
||
if k.lower() == 'authorization':
|
||
v = 'Bearer ***'
|
||
parts.append(f" -H '{k}: {v}'")
|
||
if json_body is not None:
|
||
parts.append(" -H 'Content-Type: application/json'")
|
||
try:
|
||
body_str = json.dumps(_sanitize(json_body), ensure_ascii=False)
|
||
except Exception:
|
||
body_str = str(_sanitize(json_body))
|
||
parts.append(f" -d '{body_str}'")
|
||
elif data is not None:
|
||
try:
|
||
body_str = json.dumps(_sanitize(data), ensure_ascii=False)
|
||
except Exception:
|
||
body_str = str(_sanitize(data))
|
||
parts.append(f" --data-raw '{body_str}'")
|
||
return " \\\n".join(parts)
|
||
|
||
|
||
# ===== Пути эндпоинтов (как в swagger) =====
|
||
EP_DEFAULTS: Dict[str, str] = {
|
||
# Auth
|
||
'AUTH_REGISTER_PATH': '/auth/v1/register',
|
||
'AUTH_TOKEN_PATH': '/auth/v1/token',
|
||
'AUTH_REFRESH_PATH': '/auth/v1/refresh',
|
||
'ME_PATH': '/auth/v1/me',
|
||
'USERS_LIST_PATH': '/auth/v1/users',
|
||
'USER_DETAIL_PATH': '/auth/v1/users/{user_id}',
|
||
|
||
# Profiles
|
||
'PROFILE_ME_PATH': '/profiles/v1/profiles/me',
|
||
'PROFILES_CREATE_PATH': '/profiles/v1/profiles',
|
||
'PROFILE_PHOTO_UPLOAD_PATH': '/profiles/v1/profiles/me/photo',
|
||
'PROFILE_PHOTO_DELETE_PATH': '/profiles/v1/profiles/me/photo',
|
||
|
||
# Pairs
|
||
'PAIRS_PATH': '/match/v1/pairs',
|
||
'PAIR_DETAIL_PATH': '/match/v1/pairs/{pair_id}',
|
||
'PAIR_ACCEPT_PATH': '/match/v1/pairs/{pair_id}/accept',
|
||
'PAIR_REJECT_PATH': '/match/v1/pairs/{pair_id}/reject',
|
||
|
||
# Chat
|
||
'ROOMS_PATH': '/chat/v1/rooms',
|
||
'ROOM_DETAIL_PATH': '/chat/v1/rooms/{room_id}',
|
||
'ROOM_MESSAGES_PATH': '/chat/v1/rooms/{room_id}/messages',
|
||
|
||
# Payments
|
||
'INVOICES_PATH': '/payments/v1/invoices',
|
||
'INVOICE_DETAIL_PATH': '/payments/v1/invoices/{inv_id}',
|
||
'INVOICE_MARK_PAID_PATH': '/payments/v1/invoices/{inv_id}/mark-paid',
|
||
}
|
||
|
||
def EP(key: str) -> str:
|
||
return os.environ.get(f'API_{key}', EP_DEFAULTS[key])
|
||
|
||
|
||
# ===== База API с авто‑детектом =====
|
||
_API_BASE_CACHE: Optional[str] = None
|
||
_API_LAST_SELECT_SRC = 'DEFAULT' # для логов
|
||
|
||
def _detect_api_base_from_openapi() -> Optional[str]:
|
||
"""
|
||
Берём servers[0].url из openapi.json (по схеме — http://localhost:8080).
|
||
"""
|
||
candidates = [
|
||
os.environ.get('API_SPEC_PATH'),
|
||
getattr(settings, 'API_SPEC_PATH', None),
|
||
os.path.join(getattr(settings, 'BASE_DIR', ''), 'openapi.json') if getattr(settings, 'BASE_DIR', None) else None,
|
||
os.path.join(getattr(settings, 'BASE_DIR', ''), 'agency', 'openapi.json') if getattr(settings, 'BASE_DIR', None) else None,
|
||
'/mnt/data/openapi.json',
|
||
]
|
||
for p in candidates:
|
||
if p and os.path.isfile(p):
|
||
try:
|
||
with open(p, 'r', encoding='utf-8') as f:
|
||
spec = json.load(f)
|
||
servers = spec.get('servers') or []
|
||
if servers and isinstance(servers[0], dict):
|
||
url = servers[0].get('url')
|
||
if url:
|
||
return url
|
||
except Exception as e:
|
||
if API_DEBUG:
|
||
logger.debug('API: cannot read OpenAPI from %s: %s', p, e)
|
||
return None
|
||
|
||
|
||
def _get_api_base_url() -> str:
|
||
"""
|
||
Источники (по приоритету):
|
||
1) ENV/Settings: API_BASE_URL, затем BASE_URL
|
||
2) servers[0].url из openapi.json
|
||
3) 'http://localhost:8080'
|
||
"""
|
||
global _API_BASE_CACHE, _API_LAST_SELECT_SRC
|
||
if _API_BASE_CACHE:
|
||
return _API_BASE_CACHE
|
||
|
||
base = (os.environ.get('API_BASE_URL') or getattr(settings, 'API_BASE_URL', '')
|
||
or os.environ.get('BASE_URL') or getattr(settings, 'BASE_URL', ''))
|
||
if base:
|
||
_API_BASE_CACHE = base.rstrip('/')
|
||
_API_LAST_SELECT_SRC = 'ENV/SETTINGS'
|
||
else:
|
||
detected = _detect_api_base_from_openapi()
|
||
if detected:
|
||
_API_BASE_CACHE = detected.rstrip('/')
|
||
_API_LAST_SELECT_SRC = 'OPENAPI'
|
||
else:
|
||
_API_BASE_CACHE = 'http://localhost:8080'
|
||
_API_LAST_SELECT_SRC = 'HARDCODED'
|
||
|
||
if API_DEBUG:
|
||
logger.info("API base selected [%s]: %s", _API_LAST_SELECT_SRC, _API_BASE_CACHE)
|
||
return _API_BASE_CACHE
|
||
|
||
|
||
class ApiError(Exception):
|
||
def __init__(self, status: int, message: str = 'API error', payload: Optional[dict] = None, req_id: Optional[str] = None):
|
||
if req_id and message and 'req_id=' not in message:
|
||
message = f"{message} (req_id={req_id})"
|
||
super().__init__(message)
|
||
self.status = status
|
||
self.payload = payload or {}
|
||
self.req_id = req_id
|
||
|
||
|
||
def _base_headers(request, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]:
|
||
"""
|
||
Достаём токен сначала из сессии, затем из куки — чтобы «куки‑режим» работал без доп. настроек.
|
||
"""
|
||
headers: Dict[str, str] = {'Accept': 'application/json'}
|
||
token = request.session.get('access_token') or request.COOKIES.get('access_token')
|
||
if token:
|
||
headers['Authorization'] = f'Bearer {token}'
|
||
api_key = getattr(settings, 'API_KEY', '') or os.environ.get('API_KEY', '')
|
||
if api_key:
|
||
headers['X-API-Key'] = api_key
|
||
if extra:
|
||
headers.update(extra)
|
||
return headers
|
||
|
||
|
||
def _url(path: str) -> str:
|
||
base = _get_api_base_url()
|
||
path = path if path.startswith('/') else '/' + path
|
||
return base + path
|
||
|
||
|
||
def request_api(
|
||
request,
|
||
method: str,
|
||
path: str,
|
||
*,
|
||
params: Optional[dict] = None,
|
||
json: Optional[dict] = None,
|
||
files: Optional[dict] = None,
|
||
data: Optional[dict] = None,
|
||
) -> Tuple[int, Any]:
|
||
"""
|
||
Универсальный HTTP-вызов (упрощённая версия):
|
||
- токен/ключ в заголовках (токен берём из сессии или куки),
|
||
- auto-refresh при 401 (refresh берём тоже из сессии или куки),
|
||
- 404 → переключаем базу: servers[0].url из openapi.json → 'http://localhost:8080',
|
||
- подробные логи; БЕЗ «retry со слэшем».
|
||
"""
|
||
global _API_BASE_CACHE, _API_LAST_SELECT_SRC
|
||
|
||
req_id = uuid.uuid4().hex[:8]
|
||
base_before = _get_api_base_url()
|
||
url = _url(path)
|
||
|
||
def _do(_url: str):
|
||
headers = _base_headers(request)
|
||
if json is not None and files is None and data is None:
|
||
headers['Content-Type'] = 'application/json'
|
||
|
||
if API_DEBUG:
|
||
log_headers = _sanitize(headers) if API_LOG_HEADERS else {}
|
||
log_body = _sanitize(json if json is not None else data)
|
||
if API_LOG_CURL:
|
||
try:
|
||
curl = _build_curl(method, _url, headers, params, json, data)
|
||
logger.debug("API[req_id=%s] cURL:\n%s", req_id, curl)
|
||
except Exception:
|
||
pass
|
||
logger.info(
|
||
"API[req_id=%s] REQUEST %s %s params=%s headers=%s body=%s",
|
||
req_id, method.upper(), _url, _sanitize(params), log_headers, log_body
|
||
)
|
||
|
||
t0 = time.time()
|
||
resp = requests.request(
|
||
method=method.upper(),
|
||
url=_url,
|
||
headers=headers,
|
||
params=params,
|
||
json=json,
|
||
data=data,
|
||
files=files,
|
||
timeout=float(getattr(settings, 'API_TIMEOUT', 8.0)),
|
||
)
|
||
dt = int((time.time() - t0) * 1000)
|
||
|
||
content_type = resp.headers.get('Content-Type', '')
|
||
try:
|
||
payload = resp.json() if 'application/json' in content_type else {}
|
||
except ValueError:
|
||
payload = {}
|
||
|
||
if API_DEBUG:
|
||
body_str = ""
|
||
try:
|
||
body_str = json.dumps(_sanitize(payload), ensure_ascii=False)
|
||
except Exception:
|
||
body_str = str(_sanitize(payload))
|
||
headers_out = _sanitize(dict(resp.headers)) if API_LOG_HEADERS else {}
|
||
logger.info(
|
||
"API[req_id=%s] RESPONSE %s %sms ct=%s headers=%s body=%s",
|
||
req_id, resp.status_code, dt, content_type, headers_out, _shorten(body_str, API_LOG_BODY_MAX)
|
||
)
|
||
|
||
return resp, payload
|
||
|
||
# 1) Первый запрос
|
||
try:
|
||
resp, payload = _do(url)
|
||
except requests.RequestException as e:
|
||
logger.exception('API[req_id=%s] network error: %s', req_id, e)
|
||
raise ApiError(0, f'Network unavailable or timeout when accessing API ({e})', req_id=req_id)
|
||
|
||
# 2) 404 → переключаем базу (openapi → 8080) и повторяем
|
||
if resp.status_code == 404:
|
||
candidates: List[tuple[str, str]] = []
|
||
if API_FALLBACK_OPENAPI_ON_404:
|
||
detected = _detect_api_base_from_openapi()
|
||
if detected:
|
||
candidates.append((detected.rstrip('/'), 'OPENAPI(FAILOVER)'))
|
||
candidates.append(('http://localhost:8080', 'DEFAULT(FAILOVER)'))
|
||
|
||
for cand_base, label in candidates:
|
||
if not cand_base or cand_base == _API_BASE_CACHE:
|
||
continue
|
||
if API_DEBUG:
|
||
logger.warning("API[req_id=%s] 404 on base %s → switch API base to %s and retry",
|
||
req_id, _API_BASE_CACHE, cand_base)
|
||
_API_BASE_CACHE = cand_base
|
||
_API_LAST_SELECT_SRC = label
|
||
try:
|
||
resp, payload = _do(_url(path))
|
||
if resp.status_code != 404:
|
||
break
|
||
except requests.RequestException:
|
||
continue
|
||
|
||
# 3) 401 → refresh и повтор
|
||
refresh_token = request.session.get('refresh_token') or request.COOKIES.get('refresh_token')
|
||
if resp.status_code == 401 and refresh_token:
|
||
if API_DEBUG:
|
||
logger.info("API[req_id=%s] 401 → try refresh token", req_id)
|
||
try:
|
||
refresh_url = _url(EP('AUTH_REFRESH_PATH'))
|
||
refresh_body = {'refresh_token': refresh_token}
|
||
logger.info("API[req_id=%s] REFRESH POST %s body=%s", req_id, refresh_url, _sanitize(refresh_body))
|
||
refresh_resp = requests.post(refresh_url, json=refresh_body, timeout=float(getattr(settings, 'API_TIMEOUT', 8.0)))
|
||
if refresh_resp.status_code == 200:
|
||
try:
|
||
rj = refresh_resp.json()
|
||
except ValueError:
|
||
rj = {}
|
||
if rj.get('access_token'):
|
||
request.session['access_token'] = rj['access_token']
|
||
if rj.get('refresh_token'):
|
||
request.session['refresh_token'] = rj['refresh_token']
|
||
request.session.modified = True
|
||
if API_DEBUG:
|
||
logger.info("API[req_id=%s] REFRESH OK → retry original request", req_id)
|
||
resp, payload = _do(_url(path))
|
||
else:
|
||
logger.warning("API[req_id=%s] REFRESH failed: %s", req_id, refresh_resp.status_code)
|
||
except requests.RequestException as e:
|
||
logger.exception('API[req_id=%s] Refresh token network error: %s', req_id, e)
|
||
raise ApiError(401, f'Token refresh failed ({e})', req_id=req_id)
|
||
|
||
# 4) Ошибки
|
||
if not (200 <= resp.status_code < 300):
|
||
msg = None
|
||
if isinstance(payload, dict):
|
||
msg = payload.get('detail') or payload.get('message')
|
||
msg = msg or f'API error: {resp.status_code}'
|
||
if resp.status_code in (401, 403):
|
||
# PermissionDenied обрабатываем во view (не всегда это «выйти и войти заново»)
|
||
raise PermissionDenied(f"{msg} (req_id={req_id})")
|
||
raise ApiError(resp.status_code, msg, payload if isinstance(payload, dict) else {}, req_id=req_id)
|
||
|
||
# 5) База сменилась — отметим
|
||
base_after = _get_api_base_url()
|
||
if API_DEBUG and base_before != base_after:
|
||
logger.warning("API[req_id=%s] BASE SWITCHED: %s → %s", req_id, base_before, base_after)
|
||
|
||
return resp.status_code, payload
|
||
|
||
|
||
# ==========================
|
||
# AUTH
|
||
# ==========================
|
||
|
||
def register_user(request, email: str, password: str, full_name: Optional[str] = None, role: str = 'CLIENT') -> Dict[str, Any]:
|
||
body = {'email': email, 'password': password, 'full_name': full_name, 'role': role}
|
||
_, data = request_api(request, 'POST', EP('AUTH_REGISTER_PATH'), json=body)
|
||
return data # UserRead
|
||
|
||
def login(request, email: str, password: str) -> Dict[str, Any]:
|
||
body = {'email': email, 'password': password}
|
||
_, data = request_api(request, 'POST', EP('AUTH_TOKEN_PATH'), json=body)
|
||
return data # TokenPair
|
||
|
||
def get_current_user(request) -> Dict[str, Any]:
|
||
_, data = request_api(request, 'GET', EP('ME_PATH'))
|
||
return data # UserRead
|
||
|
||
def list_users(request, offset: int = 0, limit: int = 50) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
|
||
params = {'offset': offset, 'limit': limit}
|
||
_, data = request_api(request, 'GET', EP('USERS_LIST_PATH'), params=params)
|
||
return data
|
||
|
||
def get_user(request, user_id: str) -> Dict[str, Any]:
|
||
path = EP('USER_DETAIL_PATH').format(user_id=user_id)
|
||
_, data = request_api(request, 'GET', path)
|
||
return data
|
||
|
||
def update_user(request, user_id: str, **fields) -> Dict[str, Any]:
|
||
path = EP('USER_DETAIL_PATH').format(user_id=user_id)
|
||
_, data = request_api(request, 'PATCH', path, json=fields)
|
||
return data
|
||
|
||
def delete_user(request, user_id: str) -> None:
|
||
path = EP('USER_DETAIL_PATH').format(user_id=user_id)
|
||
request_api(request, 'DELETE', path)
|
||
|
||
|
||
# ==========================
|
||
# PROFILES
|
||
# ==========================
|
||
|
||
def get_my_profile(request) -> Dict[str, Any]:
|
||
_, data = request_api(request, 'GET', EP('PROFILE_ME_PATH'))
|
||
return data # ProfileOut
|
||
|
||
def create_my_profile(request, gender: str, city: str, languages: List[str], interests: List[str]) -> Dict[str, Any]:
|
||
body = {'gender': gender, 'city': city, 'languages': languages, 'interests': interests}
|
||
_, data = request_api(request, 'POST', EP('PROFILES_CREATE_PATH'), json=body)
|
||
return data # ProfileOut
|
||
|
||
|
||
# ==========================
|
||
# PAIRS
|
||
# ==========================
|
||
|
||
def create_pair(request, user_id_a: str, user_id_b: str, score: Optional[float] = None, notes: Optional[str] = None) -> Dict[str, Any]:
|
||
body = {'user_id_a': user_id_a, 'user_id_b': user_id_b, 'score': score, 'notes': notes}
|
||
_, data = request_api(request, 'POST', EP('PAIRS_PATH'), json=body)
|
||
return data # PairRead
|
||
|
||
def list_pairs(request, for_user_id: Optional[str] = None, status: Optional[str] = None, offset: int = 0, limit: int = 50) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
|
||
params = {'for_user_id': for_user_id, 'status': status, 'offset': offset, 'limit': limit}
|
||
params = {k: v for k, v in params.items() if v is not None}
|
||
_, data = request_api(request, 'GET', EP('PAIRS_PATH'), params=params)
|
||
return data
|
||
|
||
def get_pair(request, pair_id: str) -> Dict[str, Any]:
|
||
path = EP('PAIR_DETAIL_PATH').format(pair_id=pair_id)
|
||
_, data = request_api(request, 'GET', path)
|
||
return data
|
||
|
||
def update_pair(request, pair_id: str, **fields) -> Dict[str, Any]:
|
||
path = EP('PAIR_DETAIL_PATH').format(pair_id=pair_id)
|
||
_, data = request_api(request, 'PATCH', path, json=fields)
|
||
return data
|
||
|
||
def delete_pair(request, pair_id: str) -> None:
|
||
path = EP('PAIR_DETAIL_PATH').format(pair_id=pair_id)
|
||
request_api(request, 'DELETE', path)
|
||
|
||
def accept_pair(request, pair_id: str) -> Dict[str, Any]:
|
||
path = EP('PAIR_ACCEPT_PATH').format(pair_id=pair_id)
|
||
_, data = request_api(request, 'POST', path)
|
||
return data
|
||
|
||
def reject_pair(request, pair_id: str) -> Dict[str, Any]:
|
||
path = EP('PAIR_REJECT_PATH').format(pair_id=pair_id)
|
||
_, data = request_api(request, 'POST', path)
|
||
return data
|
||
|
||
|
||
# ==========================
|
||
# CHAT
|
||
# ==========================
|
||
|
||
def my_rooms(request) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
|
||
_, data = request_api(request, 'GET', EP('ROOMS_PATH'))
|
||
return data
|
||
|
||
def get_room(request, room_id: str) -> Dict[str, Any]:
|
||
path = EP('ROOM_DETAIL_PATH').format(room_id=room_id)
|
||
_, data = request_api(request, 'GET', path)
|
||
return data
|
||
|
||
def create_room(request, title: Optional[str] = None, participants: List[str] = []) -> Dict[str, Any]:
|
||
body = {'title': title, 'participants': participants}
|
||
_, data = request_api(request, 'POST', EP('ROOMS_PATH'), json=body)
|
||
return data
|
||
|
||
def send_message(request, room_id: str, content: str) -> Dict[str, Any]:
|
||
body = {'content': content}
|
||
path = EP('ROOM_MESSAGES_PATH').format(room_id=room_id)
|
||
_, data = request_api(request, 'POST', path, json=body)
|
||
return data
|
||
|
||
def list_messages(request, room_id: str, offset: int = 0, limit: int = 100) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
|
||
params = {'offset': offset, 'limit': limit}
|
||
path = EP('ROOM_MESSAGES_PATH').format(room_id=room_id)
|
||
_, data = request_api(request, 'GET', path, params=params)
|
||
return data
|
||
|
||
|
||
# ==========================
|
||
# PAYMENTS
|
||
# ==========================
|
||
|
||
def create_invoice(request, client_id: str, amount: float, currency: str, description: Optional[str] = None) -> Dict[str, Any]:
|
||
body = {'client_id': client_id, 'amount': amount, 'currency': currency, 'description': description}
|
||
_, data = request_api(request, 'POST', EP('INVOICES_PATH'), json=body)
|
||
return data
|
||
|
||
def list_invoices(request, client_id: Optional[str] = None, status: Optional[str] = None, offset: int = 0, limit: int = 50) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
|
||
params = {'client_id': client_id, 'status': status, 'offset': offset, 'limit': limit}
|
||
params = {k: v for k, v in params.items() if v is not None}
|
||
_, data = request_api(request, 'GET', EP('INVOICES_PATH'), params=params)
|
||
return data
|
||
|
||
def get_invoice(request, inv_id: str) -> Dict[str, Any]:
|
||
path = EP('INVOICE_DETAIL_PATH').format(inv_id=inv_id)
|
||
_, data = request_api(request, 'GET', path)
|
||
return data
|
||
|
||
def update_invoice(request, inv_id: str, **fields) -> Dict[str, Any]:
|
||
path = EP('INVOICE_DETAIL_PATH').format(inv_id=inv_id)
|
||
_, data = request_api(request, 'PATCH', path, json=fields)
|
||
return data
|
||
|
||
def delete_invoice(request, inv_id: str) -> None:
|
||
path = EP('INVOICE_DETAIL_PATH').format(inv_id=inv_id)
|
||
request_api(request, 'DELETE', path)
|
||
|
||
def mark_invoice_paid(request, inv_id: str) -> Dict[str, Any]:
|
||
path = EP('INVOICE_MARK_PAID_PATH').format(inv_id=inv_id)
|
||
_, data = request_api(request, 'POST', path)
|
||
return data
|
||
|
||
def upload_my_photo(request, file_obj) -> Dict[str, Any]:
|
||
"""
|
||
Отправляет multipart/form-data на бекенд для загрузки фото профиля.
|
||
Ожидаем, что сервер примет поле 'file' и вернёт обновлённый профиль или {photo_url: "..."}.
|
||
"""
|
||
path = EP('PROFILE_PHOTO_UPLOAD_PATH')
|
||
filename = getattr(file_obj, 'name', 'photo.jpg')
|
||
content_type = getattr(file_obj, 'content_type', 'application/octet-stream')
|
||
files = {'file': (filename, file_obj, content_type)}
|
||
_, data = request_api(request, 'POST', path, files=files)
|
||
return data
|
||
|
||
def delete_my_photo(request) -> Dict[str, Any]:
|
||
"""
|
||
Удаляет фото профиля (если сервер поддерживает DELETE на том же пути).
|
||
"""
|
||
path = EP('PROFILE_PHOTO_DELETE_PATH')
|
||
_, data = request_api(request, 'DELETE', path)
|
||
return data |