init commit

This commit is contained in:
2025-08-10 15:31:47 +09:00
commit 377edb22d8
25 changed files with 1922 additions and 0 deletions

0
ui/__init__.py Normal file
View File

527
ui/api.py Normal file
View File

@@ -0,0 +1,527 @@
import logging
import os
import os.path
import json
import time
import uuid
from typing import Any, Dict, Optional, Tuple, List, Union
import requests
from django.conf import settings
from django.core.exceptions import PermissionDenied
logger = logging.getLogger(__name__)
# ===== Логирование / флаги =====
API_DEBUG = os.environ.get('API_DEBUG', '1') == '1'
API_LOG_BODY_MAX = int(os.environ.get('API_LOG_BODY_MAX', '2000'))
API_LOG_HEADERS = os.environ.get('API_LOG_HEADERS', '1') == '1'
API_LOG_CURL = os.environ.get('API_LOG_CURL', '0') == '1'
# Переключение базы при 404: сначала servers[0].url из openapi.json, потом жёстко http://localhost:8080
API_FALLBACK_OPENAPI_ON_404 = os.environ.get('API_FALLBACK_OPENAPI_ON_404', '1') == '1'
SENSITIVE_KEYS = {'password', 'refresh_token', 'access_token', 'authorization', 'token', 'api_key'}
def _sanitize(obj: Any) -> Any:
try:
if isinstance(obj, dict):
return {k: ('***' if isinstance(k, str) and k.lower() in SENSITIVE_KEYS else _sanitize(v))
for k, v in obj.items()}
if isinstance(obj, list):
return [_sanitize(x) for x in obj]
return obj
except Exception:
return obj
def _shorten(s: str, limit: int) -> str:
if s is None:
return ''
return s if len(s) <= limit else (s[:limit] + f'... <truncated {len(s)-limit} chars>')
def _build_curl(method: str, url: str, headers: Dict[str, str],
params: Optional[dict], json_body: Optional[dict], data: Optional[dict]) -> str:
parts = [f"curl -X '{method.upper()}' \\\n '{url}'"]
for k, v in headers.items():
if k.lower() == 'authorization':
v = 'Bearer ***'
parts.append(f" -H '{k}: {v}'")
if json_body is not None:
parts.append(" -H 'Content-Type: application/json'")
try:
body_str = json.dumps(_sanitize(json_body), ensure_ascii=False)
except Exception:
body_str = str(_sanitize(json_body))
parts.append(f" -d '{body_str}'")
elif data is not None:
try:
body_str = json.dumps(_sanitize(data), ensure_ascii=False)
except Exception:
body_str = str(_sanitize(data))
parts.append(f" --data-raw '{body_str}'")
return " \\\n".join(parts)
# ===== Пути эндпоинтов (как в swagger) =====
EP_DEFAULTS: Dict[str, str] = {
# Auth
'AUTH_REGISTER_PATH': '/auth/v1/register',
'AUTH_TOKEN_PATH': '/auth/v1/token',
'AUTH_REFRESH_PATH': '/auth/v1/refresh',
'ME_PATH': '/auth/v1/me',
'USERS_LIST_PATH': '/auth/v1/users',
'USER_DETAIL_PATH': '/auth/v1/users/{user_id}',
# Profiles
'PROFILE_ME_PATH': '/profiles/v1/profiles/me',
'PROFILES_CREATE_PATH': '/profiles/v1/profiles',
'PROFILE_PHOTO_UPLOAD_PATH': '/profiles/v1/profiles/me/photo',
'PROFILE_PHOTO_DELETE_PATH': '/profiles/v1/profiles/me/photo',
# Pairs
'PAIRS_PATH': '/match/v1/pairs',
'PAIR_DETAIL_PATH': '/match/v1/pairs/{pair_id}',
'PAIR_ACCEPT_PATH': '/match/v1/pairs/{pair_id}/accept',
'PAIR_REJECT_PATH': '/match/v1/pairs/{pair_id}/reject',
# Chat
'ROOMS_PATH': '/chat/v1/rooms',
'ROOM_DETAIL_PATH': '/chat/v1/rooms/{room_id}',
'ROOM_MESSAGES_PATH': '/chat/v1/rooms/{room_id}/messages',
# Payments
'INVOICES_PATH': '/payments/v1/invoices',
'INVOICE_DETAIL_PATH': '/payments/v1/invoices/{inv_id}',
'INVOICE_MARK_PAID_PATH': '/payments/v1/invoices/{inv_id}/mark-paid',
}
def EP(key: str) -> str:
return os.environ.get(f'API_{key}', EP_DEFAULTS[key])
# ===== База API с авто‑детектом =====
_API_BASE_CACHE: Optional[str] = None
_API_LAST_SELECT_SRC = 'DEFAULT' # для логов
def _detect_api_base_from_openapi() -> Optional[str]:
"""
Берём servers[0].url из openapi.json (по схеме — http://localhost:8080).
"""
candidates = [
os.environ.get('API_SPEC_PATH'),
getattr(settings, 'API_SPEC_PATH', None),
os.path.join(getattr(settings, 'BASE_DIR', ''), 'openapi.json') if getattr(settings, 'BASE_DIR', None) else None,
os.path.join(getattr(settings, 'BASE_DIR', ''), 'agency', 'openapi.json') if getattr(settings, 'BASE_DIR', None) else None,
'/mnt/data/openapi.json',
]
for p in candidates:
if p and os.path.isfile(p):
try:
with open(p, 'r', encoding='utf-8') as f:
spec = json.load(f)
servers = spec.get('servers') or []
if servers and isinstance(servers[0], dict):
url = servers[0].get('url')
if url:
return url
except Exception as e:
if API_DEBUG:
logger.debug('API: cannot read OpenAPI from %s: %s', p, e)
return None
def _get_api_base_url() -> str:
"""
Источники (по приоритету):
1) ENV/Settings: API_BASE_URL, затем BASE_URL
2) servers[0].url из openapi.json
3) 'http://localhost:8080'
"""
global _API_BASE_CACHE, _API_LAST_SELECT_SRC
if _API_BASE_CACHE:
return _API_BASE_CACHE
base = (os.environ.get('API_BASE_URL') or getattr(settings, 'API_BASE_URL', '')
or os.environ.get('BASE_URL') or getattr(settings, 'BASE_URL', ''))
if base:
_API_BASE_CACHE = base.rstrip('/')
_API_LAST_SELECT_SRC = 'ENV/SETTINGS'
else:
detected = _detect_api_base_from_openapi()
if detected:
_API_BASE_CACHE = detected.rstrip('/')
_API_LAST_SELECT_SRC = 'OPENAPI'
else:
_API_BASE_CACHE = 'http://localhost:8080'
_API_LAST_SELECT_SRC = 'HARDCODED'
if API_DEBUG:
logger.info("API base selected [%s]: %s", _API_LAST_SELECT_SRC, _API_BASE_CACHE)
return _API_BASE_CACHE
class ApiError(Exception):
def __init__(self, status: int, message: str = 'API error', payload: Optional[dict] = None, req_id: Optional[str] = None):
if req_id and message and 'req_id=' not in message:
message = f"{message} (req_id={req_id})"
super().__init__(message)
self.status = status
self.payload = payload or {}
self.req_id = req_id
def _base_headers(request, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]:
"""
Достаём токен сначала из сессии, затем из куки — чтобы «куки‑режим» работал без доп. настроек.
"""
headers: Dict[str, str] = {'Accept': 'application/json'}
token = request.session.get('access_token') or request.COOKIES.get('access_token')
if token:
headers['Authorization'] = f'Bearer {token}'
api_key = getattr(settings, 'API_KEY', '') or os.environ.get('API_KEY', '')
if api_key:
headers['X-API-Key'] = api_key
if extra:
headers.update(extra)
return headers
def _url(path: str) -> str:
base = _get_api_base_url()
path = path if path.startswith('/') else '/' + path
return base + path
def request_api(
request,
method: str,
path: str,
*,
params: Optional[dict] = None,
json: Optional[dict] = None,
files: Optional[dict] = None,
data: Optional[dict] = None,
) -> Tuple[int, Any]:
"""
Универсальный HTTP-вызов (упрощённая версия):
- токен/ключ в заголовках (токен берём из сессии или куки),
- auto-refresh при 401 (refresh берём тоже из сессии или куки),
- 404 → переключаем базу: servers[0].url из openapi.json → 'http://localhost:8080',
- подробные логи; БЕЗ «retry со слэшем».
"""
global _API_BASE_CACHE, _API_LAST_SELECT_SRC
req_id = uuid.uuid4().hex[:8]
base_before = _get_api_base_url()
url = _url(path)
def _do(_url: str):
headers = _base_headers(request)
if json is not None and files is None and data is None:
headers['Content-Type'] = 'application/json'
if API_DEBUG:
log_headers = _sanitize(headers) if API_LOG_HEADERS else {}
log_body = _sanitize(json if json is not None else data)
if API_LOG_CURL:
try:
curl = _build_curl(method, _url, headers, params, json, data)
logger.debug("API[req_id=%s] cURL:\n%s", req_id, curl)
except Exception:
pass
logger.info(
"API[req_id=%s] REQUEST %s %s params=%s headers=%s body=%s",
req_id, method.upper(), _url, _sanitize(params), log_headers, log_body
)
t0 = time.time()
resp = requests.request(
method=method.upper(),
url=_url,
headers=headers,
params=params,
json=json,
data=data,
files=files,
timeout=float(getattr(settings, 'API_TIMEOUT', 8.0)),
)
dt = int((time.time() - t0) * 1000)
content_type = resp.headers.get('Content-Type', '')
try:
payload = resp.json() if 'application/json' in content_type else {}
except ValueError:
payload = {}
if API_DEBUG:
body_str = ""
try:
body_str = json.dumps(_sanitize(payload), ensure_ascii=False)
except Exception:
body_str = str(_sanitize(payload))
headers_out = _sanitize(dict(resp.headers)) if API_LOG_HEADERS else {}
logger.info(
"API[req_id=%s] RESPONSE %s %sms ct=%s headers=%s body=%s",
req_id, resp.status_code, dt, content_type, headers_out, _shorten(body_str, API_LOG_BODY_MAX)
)
return resp, payload
# 1) Первый запрос
try:
resp, payload = _do(url)
except requests.RequestException as e:
logger.exception('API[req_id=%s] network error: %s', req_id, e)
raise ApiError(0, f'Network unavailable or timeout when accessing API ({e})', req_id=req_id)
# 2) 404 → переключаем базу (openapi → 8080) и повторяем
if resp.status_code == 404:
candidates: List[tuple[str, str]] = []
if API_FALLBACK_OPENAPI_ON_404:
detected = _detect_api_base_from_openapi()
if detected:
candidates.append((detected.rstrip('/'), 'OPENAPI(FAILOVER)'))
candidates.append(('http://localhost:8080', 'DEFAULT(FAILOVER)'))
for cand_base, label in candidates:
if not cand_base or cand_base == _API_BASE_CACHE:
continue
if API_DEBUG:
logger.warning("API[req_id=%s] 404 on base %s → switch API base to %s and retry",
req_id, _API_BASE_CACHE, cand_base)
_API_BASE_CACHE = cand_base
_API_LAST_SELECT_SRC = label
try:
resp, payload = _do(_url(path))
if resp.status_code != 404:
break
except requests.RequestException:
continue
# 3) 401 → refresh и повтор
refresh_token = request.session.get('refresh_token') or request.COOKIES.get('refresh_token')
if resp.status_code == 401 and refresh_token:
if API_DEBUG:
logger.info("API[req_id=%s] 401 → try refresh token", req_id)
try:
refresh_url = _url(EP('AUTH_REFRESH_PATH'))
refresh_body = {'refresh_token': refresh_token}
logger.info("API[req_id=%s] REFRESH POST %s body=%s", req_id, refresh_url, _sanitize(refresh_body))
refresh_resp = requests.post(refresh_url, json=refresh_body, timeout=float(getattr(settings, 'API_TIMEOUT', 8.0)))
if refresh_resp.status_code == 200:
try:
rj = refresh_resp.json()
except ValueError:
rj = {}
if rj.get('access_token'):
request.session['access_token'] = rj['access_token']
if rj.get('refresh_token'):
request.session['refresh_token'] = rj['refresh_token']
request.session.modified = True
if API_DEBUG:
logger.info("API[req_id=%s] REFRESH OK → retry original request", req_id)
resp, payload = _do(_url(path))
else:
logger.warning("API[req_id=%s] REFRESH failed: %s", req_id, refresh_resp.status_code)
except requests.RequestException as e:
logger.exception('API[req_id=%s] Refresh token network error: %s', req_id, e)
raise ApiError(401, f'Token refresh failed ({e})', req_id=req_id)
# 4) Ошибки
if not (200 <= resp.status_code < 300):
msg = None
if isinstance(payload, dict):
msg = payload.get('detail') or payload.get('message')
msg = msg or f'API error: {resp.status_code}'
if resp.status_code in (401, 403):
# PermissionDenied обрабатываем во view (не всегда это «выйти и войти заново»)
raise PermissionDenied(f"{msg} (req_id={req_id})")
raise ApiError(resp.status_code, msg, payload if isinstance(payload, dict) else {}, req_id=req_id)
# 5) База сменилась — отметим
base_after = _get_api_base_url()
if API_DEBUG and base_before != base_after:
logger.warning("API[req_id=%s] BASE SWITCHED: %s%s", req_id, base_before, base_after)
return resp.status_code, payload
# ==========================
# AUTH
# ==========================
def register_user(request, email: str, password: str, full_name: Optional[str] = None, role: str = 'CLIENT') -> Dict[str, Any]:
body = {'email': email, 'password': password, 'full_name': full_name, 'role': role}
_, data = request_api(request, 'POST', EP('AUTH_REGISTER_PATH'), json=body)
return data # UserRead
def login(request, email: str, password: str) -> Dict[str, Any]:
body = {'email': email, 'password': password}
_, data = request_api(request, 'POST', EP('AUTH_TOKEN_PATH'), json=body)
return data # TokenPair
def get_current_user(request) -> Dict[str, Any]:
_, data = request_api(request, 'GET', EP('ME_PATH'))
return data # UserRead
def list_users(request, offset: int = 0, limit: int = 50) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
params = {'offset': offset, 'limit': limit}
_, data = request_api(request, 'GET', EP('USERS_LIST_PATH'), params=params)
return data
def get_user(request, user_id: str) -> Dict[str, Any]:
path = EP('USER_DETAIL_PATH').format(user_id=user_id)
_, data = request_api(request, 'GET', path)
return data
def update_user(request, user_id: str, **fields) -> Dict[str, Any]:
path = EP('USER_DETAIL_PATH').format(user_id=user_id)
_, data = request_api(request, 'PATCH', path, json=fields)
return data
def delete_user(request, user_id: str) -> None:
path = EP('USER_DETAIL_PATH').format(user_id=user_id)
request_api(request, 'DELETE', path)
# ==========================
# PROFILES
# ==========================
def get_my_profile(request) -> Dict[str, Any]:
_, data = request_api(request, 'GET', EP('PROFILE_ME_PATH'))
return data # ProfileOut
def create_my_profile(request, gender: str, city: str, languages: List[str], interests: List[str]) -> Dict[str, Any]:
body = {'gender': gender, 'city': city, 'languages': languages, 'interests': interests}
_, data = request_api(request, 'POST', EP('PROFILES_CREATE_PATH'), json=body)
return data # ProfileOut
# ==========================
# PAIRS
# ==========================
def create_pair(request, user_id_a: str, user_id_b: str, score: Optional[float] = None, notes: Optional[str] = None) -> Dict[str, Any]:
body = {'user_id_a': user_id_a, 'user_id_b': user_id_b, 'score': score, 'notes': notes}
_, data = request_api(request, 'POST', EP('PAIRS_PATH'), json=body)
return data # PairRead
def list_pairs(request, for_user_id: Optional[str] = None, status: Optional[str] = None, offset: int = 0, limit: int = 50) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
params = {'for_user_id': for_user_id, 'status': status, 'offset': offset, 'limit': limit}
params = {k: v for k, v in params.items() if v is not None}
_, data = request_api(request, 'GET', EP('PAIRS_PATH'), params=params)
return data
def get_pair(request, pair_id: str) -> Dict[str, Any]:
path = EP('PAIR_DETAIL_PATH').format(pair_id=pair_id)
_, data = request_api(request, 'GET', path)
return data
def update_pair(request, pair_id: str, **fields) -> Dict[str, Any]:
path = EP('PAIR_DETAIL_PATH').format(pair_id=pair_id)
_, data = request_api(request, 'PATCH', path, json=fields)
return data
def delete_pair(request, pair_id: str) -> None:
path = EP('PAIR_DETAIL_PATH').format(pair_id=pair_id)
request_api(request, 'DELETE', path)
def accept_pair(request, pair_id: str) -> Dict[str, Any]:
path = EP('PAIR_ACCEPT_PATH').format(pair_id=pair_id)
_, data = request_api(request, 'POST', path)
return data
def reject_pair(request, pair_id: str) -> Dict[str, Any]:
path = EP('PAIR_REJECT_PATH').format(pair_id=pair_id)
_, data = request_api(request, 'POST', path)
return data
# ==========================
# CHAT
# ==========================
def my_rooms(request) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
_, data = request_api(request, 'GET', EP('ROOMS_PATH'))
return data
def get_room(request, room_id: str) -> Dict[str, Any]:
path = EP('ROOM_DETAIL_PATH').format(room_id=room_id)
_, data = request_api(request, 'GET', path)
return data
def create_room(request, title: Optional[str] = None, participants: List[str] = []) -> Dict[str, Any]:
body = {'title': title, 'participants': participants}
_, data = request_api(request, 'POST', EP('ROOMS_PATH'), json=body)
return data
def send_message(request, room_id: str, content: str) -> Dict[str, Any]:
body = {'content': content}
path = EP('ROOM_MESSAGES_PATH').format(room_id=room_id)
_, data = request_api(request, 'POST', path, json=body)
return data
def list_messages(request, room_id: str, offset: int = 0, limit: int = 100) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
params = {'offset': offset, 'limit': limit}
path = EP('ROOM_MESSAGES_PATH').format(room_id=room_id)
_, data = request_api(request, 'GET', path, params=params)
return data
# ==========================
# PAYMENTS
# ==========================
def create_invoice(request, client_id: str, amount: float, currency: str, description: Optional[str] = None) -> Dict[str, Any]:
body = {'client_id': client_id, 'amount': amount, 'currency': currency, 'description': description}
_, data = request_api(request, 'POST', EP('INVOICES_PATH'), json=body)
return data
def list_invoices(request, client_id: Optional[str] = None, status: Optional[str] = None, offset: int = 0, limit: int = 50) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
params = {'client_id': client_id, 'status': status, 'offset': offset, 'limit': limit}
params = {k: v for k, v in params.items() if v is not None}
_, data = request_api(request, 'GET', EP('INVOICES_PATH'), params=params)
return data
def get_invoice(request, inv_id: str) -> Dict[str, Any]:
path = EP('INVOICE_DETAIL_PATH').format(inv_id=inv_id)
_, data = request_api(request, 'GET', path)
return data
def update_invoice(request, inv_id: str, **fields) -> Dict[str, Any]:
path = EP('INVOICE_DETAIL_PATH').format(inv_id=inv_id)
_, data = request_api(request, 'PATCH', path, json=fields)
return data
def delete_invoice(request, inv_id: str) -> None:
path = EP('INVOICE_DETAIL_PATH').format(inv_id=inv_id)
request_api(request, 'DELETE', path)
def mark_invoice_paid(request, inv_id: str) -> Dict[str, Any]:
path = EP('INVOICE_MARK_PAID_PATH').format(inv_id=inv_id)
_, data = request_api(request, 'POST', path)
return data
def upload_my_photo(request, file_obj) -> Dict[str, Any]:
"""
Отправляет multipart/form-data на бекенд для загрузки фото профиля.
Ожидаем, что сервер примет поле 'file' и вернёт обновлённый профиль или {photo_url: "..."}.
"""
path = EP('PROFILE_PHOTO_UPLOAD_PATH')
filename = getattr(file_obj, 'name', 'photo.jpg')
content_type = getattr(file_obj, 'content_type', 'application/octet-stream')
files = {'file': (filename, file_obj, content_type)}
_, data = request_api(request, 'POST', path, files=files)
return data
def delete_my_photo(request) -> Dict[str, Any]:
"""
Удаляет фото профиля (если сервер поддерживает DELETE на том же пути).
"""
path = EP('PROFILE_PHOTO_DELETE_PATH')
_, data = request_api(request, 'DELETE', path)
return data

11
ui/context_processors.py Normal file
View File

@@ -0,0 +1,11 @@
from django.conf import settings
def public_settings(request):
return {
'DEBUG': settings.DEBUG,
}
def current_user(request):
# Very lightweight 'user' from API token in session
auth = request.session.get('auth', {})
return {'api_user': auth.get('user')}

20
ui/urls.py Normal file
View File

@@ -0,0 +1,20 @@
from django.urls import path
from . import views
urlpatterns = [
path('', views.index, name='index'),
# Кабинет
path("cabinet/", views.cabinet_view, name="cabinet"),
path("cabinet/photo/upload/", views.cabinet_upload_photo, name="cabinet_upload_photo"),
path("cabinet/photo/delete/", views.cabinet_delete_photo, name="cabinet_delete_photo"),
# Каталог
path("profiles/", views.profile_list, name="profiles"),
path("profiles/<uuid:pk>/", views.profile_detail, name="profile_detail"),
path("profiles/<uuid:pk>/like/", views.like_profile, name="like_profile"),
# Регистрация и авторизация
path('register/', views.register_view, name='register'),
path('login/', views.login_view, name='login'),
path('logout/', views.logout_view, name='logout'),
]

492
ui/views.py Normal file
View File

@@ -0,0 +1,492 @@
import base64
import json
import time
from typing import List, Dict, Any, Optional
from django.http import Http404, HttpResponse
from django.shortcuts import render, redirect
from django.views.decorators.http import require_http_methods, require_POST
from django.contrib import messages
from django.core.exceptions import PermissionDenied
from django.conf import settings
from django.views.decorators.csrf import csrf_exempt
from . import api
from .api import ApiError
# -------- helpers (cookies/JWT) --------
def _cookie_secure() -> bool:
# в dev можно False, в prod обязательно True
return not getattr(settings, "DEBUG", True)
def _jwt_exp_seconds(token: Optional[str], default_sec: int = 12 * 3600) -> int:
"""
Пытаемся вытащить exp из JWT и посчитать max_age для куки.
Если не получилось — даём дефолт (12 часов).
"""
if not token or token.count(".") != 2:
return default_sec
try:
payload_b64 = token.split(".")[1]
payload_b64 += "=" * (-len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(payload_b64.encode()).decode("utf-8"))
exp = int(payload.get("exp", 0))
now = int(time.time())
if exp > now:
# добавим небольшой «запас» (минус 60 сек)
return max(60, exp - now - 60)
return default_sec
except Exception:
return default_sec
def _set_auth_cookies(resp: HttpResponse, access_token: Optional[str], refresh_token: Optional[str]) -> None:
"""
Кладём токены в HttpOnly cookies с корректным сроком жизни.
"""
if access_token:
resp.set_cookie(
"access_token",
access_token,
max_age=_jwt_exp_seconds(access_token),
httponly=True,
samesite="Lax",
secure=_cookie_secure(),
path="/",
)
if refresh_token:
resp.set_cookie(
"refresh_token",
refresh_token,
max_age=_jwt_exp_seconds(refresh_token, default_sec=7 * 24 * 3600),
httponly=True,
samesite="Lax",
secure=_cookie_secure(),
path="/",
)
def _clear_auth_cookies(resp: HttpResponse) -> None:
resp.delete_cookie("access_token", path="/")
resp.delete_cookie("refresh_token", path="/")
# -------- UI helpers --------
def index(request):
return render(request, "ui/index.html", {})
def _auth_required_partial(request) -> HttpResponse:
# Миниpartial под HTMX для кнопки "в избранное", если нет логина
return render(request, "ui/components/like_button_login_required.html", {})
def _is_admin(request) -> bool:
return (request.session.get("user_role") or "").upper() == "ADMIN"
def _user_to_profile_stub(u: Dict[str, Any]) -> Dict[str, Any]:
name = u.get("full_name") or u.get("email") or "Без имени"
return {
"id": u.get("id"),
"name": name,
"email": u.get("email") or "",
"role": (u.get("role") or "").upper() or "CLIENT",
"verified": u.get("is_active", False),
# ↓ ключевые правки — чтобы шаблон не генерил src="None"
"age": None,
"city": None,
"about": "",
"photo": "", # было: None
"interests": [],
"liked": False,
}
def _format_validation(payload: Optional[dict]) -> Optional[str]:
"""Сборка сообщений 422 ValidationError в одну строку."""
if not payload:
return None
det = payload.get("detail")
if isinstance(det, list):
msgs = []
for item in det:
msg = item.get("msg")
loc = item.get("loc")
if isinstance(loc, list) and loc:
field = ".".join(str(x) for x in loc if isinstance(x, (str, int)))
if field and field not in ("body",):
msgs.append(f"{field}: {msg}")
else:
msgs.append(msg)
elif msg:
msgs.append(msg)
return "; ".join(m for m in msgs if m)
return payload.get("message") or payload.get("detail")
# ---------------- Auth ----------------
@require_http_methods(["GET", "POST"])
def login_view(request):
if request.method == "POST":
email = (request.POST.get("email") or "").strip()
password = (request.POST.get("password") or "").strip()
if not email or not password:
messages.error(request, "Укажите email и пароль")
else:
try:
token_pair = api.login(request, email, password) # POST /auth/v1/token
access = token_pair.get("access_token")
refresh = token_pair.get("refresh_token") or token_pair.get("refresh")
if not access:
raise ApiError(0, "API не вернул access_token")
# Пока храним и в сессии, и в куки (куки — для твоей задачи; сессия — на случай refresh внутри запроса)
request.session["access_token"] = access
if refresh:
request.session["refresh_token"] = refresh
me = api.get_current_user(request) # GET /auth/v1/me
request.session["user_id"] = me.get("id") or me.get("user_id")
request.session["user_email"] = me.get("email")
request.session["user_full_name"] = me.get("full_name") or me.get("email")
request.session["user_role"] = me.get("role") or "CLIENT"
request.session.modified = True
user_label = request.session.get("user_full_name") or request.session.get("user_email") or "пользователь"
messages.success(request, f"Здравствуйте, {user_label}!")
next_url = request.GET.get("next")
if not next_url:
next_url = "profiles" if _is_admin(request) else "cabinet"
resp = redirect(next_url)
_set_auth_cookies(resp, access, refresh)
return resp
except PermissionDenied:
messages.error(request, "Неверные учётные данные")
except ApiError as e:
messages.error(request, f"Ошибка входа: {e}")
return render(request, "ui/login.html", {})
@require_http_methods(["GET", "POST"])
def register_view(request):
"""
Регистрация → авто‑логин → установка токенов в cookies → редирект.
"""
if request.method == "POST":
email = (request.POST.get("email") or "").strip()
password = (request.POST.get("password") or "").strip()
full_name = (request.POST.get("full_name") or "").strip() or None
if not email or not password:
messages.error(request, "Укажите email и пароль")
else:
try:
api.register_user(request, email=email, password=password, full_name=full_name, role='CLIENT')
token_pair = api.login(request, email=email, password=password)
access = token_pair.get("access_token")
refresh = token_pair.get("refresh_token") or token_pair.get("refresh")
if not access:
raise ApiError(0, "API не вернул access_token после регистрации")
request.session["access_token"] = access
if refresh:
request.session["refresh_token"] = refresh
me = api.get_current_user(request)
request.session["user_id"] = me.get("id") or me.get("user_id")
request.session["user_email"] = me.get("email")
request.session["user_full_name"] = me.get("full_name") or me.get("email")
request.session["user_role"] = me.get("role") or "CLIENT"
request.session.modified = True
messages.success(request, "Регистрация успешна! Вы вошли в систему.")
next_url = request.GET.get("next")
if not next_url:
next_url = "profiles" if _is_admin(request) else "cabinet"
resp = redirect(next_url)
_set_auth_cookies(resp, access, refresh)
return resp
except ApiError as e:
payload = getattr(e, "payload", None)
if payload and isinstance(payload, dict):
nice = _format_validation(payload)
messages.error(request, nice or f"Ошибка регистрации: {e}")
else:
messages.error(request, f"Ошибка регистрации: {e}")
except PermissionDenied:
messages.info(request, "Регистрация прошла, войдите под своим email/паролем")
return redirect("login")
return render(request, "ui/register.html", {})
@require_http_methods(["POST", "GET"])
def logout_view(request):
for key in ("access_token", "refresh_token", "user_id", "user_email", "user_full_name", "user_role", "likes"):
request.session.pop(key, None)
request.session.modified = True
messages.info(request, "Вы вышли из аккаунта")
resp = redirect("index")
_clear_auth_cookies(resp)
return resp
# ---------------- Catalog (Adminonly) ----------------
@require_http_methods(["GET"])
def profile_list(request):
"""
Каталог анкет (по сути — пользователей) с фильтрами и пагинацией.
Доступно только ADMIN (по API /auth/v1/users; у клиента прав нет).
Фильтры клиентские: q (имя/email), role, active, email_domain; сортировка; page/limit.
"""
if not (request.session.get("access_token") or request.COOKIES.get("access_token")):
messages.info(request, "Войдите, чтобы открыть каталог")
return redirect("login")
if not _is_admin(request):
messages.info(request, "Каталог доступен только администраторам. Перенаправляем в Кабинет.")
return redirect("cabinet")
# --- читаем query-параметры ---
q = (request.GET.get("q") or "").strip().lower()
role = (request.GET.get("role") or "").strip().upper() # CLIENT|ADMIN|"" (любой)
active = request.GET.get("active") # "1"|"0"|None
email_domain = (request.GET.get("domain") or "").strip().lower().lstrip("@")
# сортировка: name, name_desc, email, email_desc (по умолчанию name)
sort = (request.GET.get("sort") or "name").strip().lower()
page = max(1, int(request.GET.get("page") or 1))
limit = min(max(1, int(request.GET.get("limit") or 20)), 200) # API максимум 200, см. спеки :contentReference[oaicite:1]{index=1}
offset = (page - 1) * limit
error: Optional[str] = None
profiles: List[Dict[str, Any]] = []
page_info = {"page": page, "limit": limit, "has_prev": page > 1, "has_next": False}
try:
# Серверная пагинация есть, фильтров — нет (кроме offset/limit) → забираем страницу
data = api.list_users(request, offset=offset, limit=limit) # GET /auth/v1/users :contentReference[oaicite:2]{index=2}
users: List[Dict[str, Any]] = (data.get("items") if isinstance(data, dict) else data) or []
# --- клиентская фильтрация ---
def keep(u: Dict[str, Any]) -> bool:
if q:
fn = (u.get("full_name") or "").lower()
em = (u.get("email") or "").lower()
if q not in fn and q not in em:
return False
if role and (u.get("role") or "").upper() != role:
return False
if active in ("1", "0"):
is_act = bool(u.get("is_active"))
if (active == "1" and not is_act) or (active == "0" and is_act):
return False
if email_domain:
em = (u.get("email") or "").lower()
dom = em.split("@")[-1] if "@" in em else ""
if dom != email_domain:
return False
return True
users = [u for u in users if keep(u)]
# --- сортировка ---
def key_name(u): return (u.get("full_name") or u.get("email") or "").lower()
def key_email(u): return (u.get("email") or "").lower()
if sort == "name":
users.sort(key=key_name)
elif sort == "name_desc":
users.sort(key=key_name, reverse=True)
elif sort == "email":
users.sort(key=key_email)
elif sort == "email_desc":
users.sort(key=key_email, reverse=True)
# Преобразуем в «анкеты»
profiles = [_user_to_profile_stub(u) for u in users]
# отметка лайков (локальная сессия)
liked_ids = set(request.session.get("likes", []))
for p in profiles:
p["liked"] = p.get("id") in liked_ids
# has_next — на глаз: если сервер отдал ровно limit без наших фильтров,
# считаем, что следующая страница потенциально есть
page_info["has_next"] = (len(users) == limit and not (q or role or active in ("1","0") or email_domain))
# (если включены клиентские фильтры — не знаем полный объём; оставим conservative False)
except PermissionDenied as e:
messages.error(request, f"Нет доступа к каталогу: {e}")
return redirect("cabinet")
except ApiError as e:
error = str(e)
ctx = {
"profiles": profiles,
"filters": {
"q": (request.GET.get("q") or "").strip(),
"role": role,
"active": (active or ""),
"domain": (request.GET.get("domain") or "").strip(),
"sort": sort,
"page": page,
"limit": limit,
},
"count": len(profiles),
"page": page_info,
"error": error,
"page_sizes": [10, 20, 50, 100, 200],
}
return render(request, "ui/profiles_list.html", ctx)
@require_http_methods(["GET"])
def profile_detail(request, pk: str):
"""
Детальная карточка пользователя — тоже ADMINonly.
"""
if not (request.session.get("access_token") or request.COOKIES.get("access_token")):
return redirect("login")
if not _is_admin(request):
messages.info(request, "Детали пользователей доступны только администраторам.")
return redirect("cabinet")
try:
user = api.get_user(request, user_id=str(pk))
except PermissionDenied as e:
messages.error(request, f"Нет доступа: {e}")
return redirect("cabinet")
except ApiError as e:
if e.status == 404:
raise Http404("Пользователь не найден")
messages.error(request, str(e))
return redirect("profiles")
profile = _user_to_profile_stub(user)
liked_ids = set(request.session.get("likes", []))
liked = profile.get("id") in liked_ids
return render(request, "ui/profile_detail.html", {"profile": profile, "liked": liked})
@require_POST
def like_profile(request, pk: str):
"""
Локальный «лайк» (для демо). Не требует API.
"""
if not (request.session.get("access_token") or request.COOKIES.get("access_token")):
return _auth_required_partial(request)
likes = set(request.session.get("likes", []))
if str(pk) in likes:
likes.remove(str(pk))
liked = False
else:
likes.add(str(pk))
liked = True
request.session["likes"] = list(likes)
request.session.modified = True
return render(request, "ui/components/like_button.html", {"profile_id": pk, "liked": liked})
# ---------------- Кабинет: мой профиль ----------------
@require_http_methods(["GET", "POST"])
def cabinet_view(request):
"""
Мой профиль:
- GET: /profiles/v1/profiles/me; если 404 — пустая форма создания
- POST: /profiles/v1/profiles (создать/заполнить свой профиль)
"""
if not (request.session.get("access_token") or request.COOKIES.get("access_token")):
messages.info(request, "Для доступа к кабинету войдите в систему")
return redirect("login")
if request.method == "POST":
gender = (request.POST.get("gender") or "").strip()
city = (request.POST.get("city") or "").strip()
languages = [s.strip() for s in (request.POST.get("languages") or "").split(",") if s.strip()]
interests = [s.strip() for s in (request.POST.get("interests") or "").split(",") if s.strip()]
if not gender or not city:
messages.error(request, "Укажите пол и город")
else:
try:
profile = api.create_my_profile(request, gender=gender, city=city, languages=languages, interests=interests)
messages.success(request, "Профиль создан")
return render(request, "ui/cabinet.html", {"profile": profile, "has_profile": True})
except PermissionDenied:
messages.error(request, "Сессия истекла, войдите снова")
return redirect("login")
except ApiError as e:
payload = getattr(e, "payload", None)
nice = _format_validation(payload) if isinstance(payload, dict) else None
messages.error(request, nice or f"Ошибка сохранения профиля: {e}")
# GET
try:
profile = api.get_my_profile(request)
# шапка кабинета — имя из сессии (или email)
header_name = request.session.get("user_full_name") or request.session.get("user_email") or ""
return render(request, "ui/cabinet.html", {"profile": profile, "has_profile": True, "header_name": header_name})
except ApiError as e:
if e.status == 404:
header_name = request.session.get("user_full_name") or request.session.get("user_email") or ""
return render(request, "ui/cabinet.html", {"profile": None, "has_profile": False, "header_name": header_name})
messages.error(request, f"Ошибка загрузки профиля: {e}")
return render(request, "ui/cabinet.html", {"profile": None, "has_profile": False})
except PermissionDenied:
messages.error(request, "Сессия истекла, войдите снова")
return redirect("login")
@require_POST
def cabinet_upload_photo(request):
# Требуется логин
if not (request.session.get("access_token") or request.COOKIES.get("access_token")):
messages.info(request, "Войдите, чтобы загрузить фото")
return redirect("login")
f = request.FILES.get("photo")
if not f:
messages.error(request, "Файл не выбран")
return redirect("cabinet")
if f.size and f.size > 5 * 1024 * 1024:
messages.error(request, "Файл слишком большой (макс. 5 МБ)")
return redirect("cabinet")
try:
api.upload_my_photo(request, f)
messages.success(request, "Фото обновлено")
except PermissionDenied:
messages.error(request, "Сессия истекла, войдите снова")
return redirect("login")
except ApiError as e:
if e.status in (404, 405):
messages.error(request, "Бэкенд пока не поддерживает загрузку фото (нет эндпоинта).")
else:
messages.error(request, f"Ошибка загрузки: {e}")
return redirect("cabinet")
@require_POST
def cabinet_delete_photo(request):
if not (request.session.get("access_token") or request.COOKIES.get("access_token")):
messages.info(request, "Войдите, чтобы удалить фото")
return redirect("login")
try:
api.delete_my_photo(request)
messages.success(request, "Фото удалено")
except PermissionDenied:
messages.error(request, "Сессия истекла, войдите снова")
return redirect("login")
except ApiError as e:
if e.status in (404, 405):
messages.error(request, "Удаление фото не поддерживается бэкендом.")
else:
messages.error(request, f"Ошибка удаления: {e}")
return redirect("cabinet")