Files
marriage_frontend/ui/views.py
2025-08-10 17:28:38 +09:00

347 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from typing import List, Dict, Any, Optional
from django.http import Http404, HttpResponse
from django.shortcuts import render, redirect
from django.views.decorators.http import require_http_methods, require_POST
from django.contrib import messages
from django.core.exceptions import PermissionDenied
from django.urls import reverse, NoReverseMatch
from . import api
from .api import ApiError
# --- Reverse helper -----------------------------------------------------------
def _reverse_first(*names: str) -> str:
"""Return the first successfully reversed URL among candidates, else '/'."""
for n in names:
try:
return reverse(n)
except NoReverseMatch:
continue
# last resort:
try:
return reverse("ui:index")
except NoReverseMatch:
return "/"
def _is_logged(request) -> bool:
return bool(request.session.get("access_token") or request.COOKIES.get("access_token"))
def _is_admin(request) -> bool:
return (request.session.get("user_role") or "").upper() == "ADMIN"
def _set_tokens_and_user(request, tokens: Dict[str, Any], me: Dict[str, Any]):
request.session["access_token"] = tokens.get("access_token")
request.session["refresh_token"] = tokens.get("refresh_token")
request.session["user_id"] = me.get("id")
request.session["user_email"] = me.get("email")
request.session["user_full_name"] = me.get("full_name")
request.session["user_role"] = me.get("role")
request.session.modified = True
def _user_to_profile_stub(u: Dict[str, Any]) -> Dict[str, Any]:
name = u.get("full_name") or u.get("email") or "Без имени"
return {
"id": u.get("id"),
"name": name,
"email": u.get("email") or "",
"role": (u.get("role") or "").upper() or "CLIENT",
"verified": bool(u.get("is_active", False)),
"age": None,
"city": None,
"about": "",
"photo": "",
"interests": [],
"liked": False,
}
# === public pages ==============================================================
def index(request):
return render(request, "ui/index.html", {})
# === catalog (admin) ==========================================================
@require_http_methods(["GET"])
def profile_list(request):
if not _is_logged(request):
messages.info(request, "Войдите, чтобы открыть каталог")
return redirect(_reverse_first("login", "ui:login"))
if not _is_admin(request):
messages.info(request, "Каталог доступен только администраторам. Перенаправляем в Кабинет.")
return redirect(_reverse_first("cabinet", "ui:cabinet"))
q = (request.GET.get("q") or "").strip().lower()
role = (request.GET.get("role") or "").strip().upper()
active = request.GET.get("active")
email_domain = (request.GET.get("domain") or "").strip().lower().lstrip("@")
sort = (request.GET.get("sort") or "name").strip().lower()
page = max(1, int(request.GET.get("page") or 1))
limit = min(max(1, int(request.GET.get("limit") or 20)), 200)
offset = (page - 1) * limit
error: Optional[str] = None
profiles: List[Dict[str, Any]] = []
page_info = {"page": page, "limit": limit, "has_prev": page > 1, "has_next": False}
try:
data = api.list_users(request, offset=offset, limit=limit)
users: List[Dict[str, Any]] = (data.get("items") if isinstance(data, dict) else data) or []
def keep(u: Dict[str, Any]) -> bool:
fn = (u.get("full_name") or "")
em = (u.get("email") or "")
if q and (q not in fn.lower() and q not in em.lower()):
return False
if role and (u.get("role") or "").upper() != role:
return False
if active in ("1", "0"):
is_act = bool(u.get("is_active", False))
if (active == "1" and not is_act) or (active == "0" and is_act):
return False
if email_domain:
dom = em.lower().split("@")[-1] if "@" in em else ""
if dom != email_domain:
return False
return True
users = [u for u in users if keep(u)]
def key_name(u): return (u.get("full_name") or u.get("email") or "").lower()
def key_email(u): return (u.get("email") or "").lower()
if sort == "name":
users.sort(key=key_name)
elif sort == "name_desc":
users.sort(key=key_name, reverse=True)
elif sort == "email":
users.sort(key=key_email)
elif sort == "email_desc":
users.sort(key=key_email, reverse=True)
profiles = [_user_to_profile_stub(u) for u in users]
liked_ids = set(request.session.get("likes", []))
for p in profiles:
p["liked"] = p.get("id") in liked_ids
page_info["has_next"] = (len(users) == limit and not (q or role or active in ("1","0") or email_domain))
except PermissionDenied as e:
messages.error(request, f"Нет доступа к каталогу: {e}")
return redirect(_reverse_first("cabinet", "ui:cabinet"))
except ApiError as e:
error = str(e)
ctx = {
"profiles": profiles,
"filters": {
"q": (request.GET.get("q") or "").strip(),
"role": role,
"active": (active or ""),
"domain": (request.GET.get("domain") or "").strip(),
"sort": sort,
"page": page,
"limit": limit,
},
"count": len(profiles),
"page": page_info,
"error": error,
"page_sizes": [10, 20, 50, 100, 200],
}
return render(request, "ui/profiles_list.html", ctx)
@require_http_methods(["GET"])
def profile_detail(request, pk):
try:
user = api.get_user(request, str(pk))
except PermissionDenied:
messages.error(request, "Сессия истекла, войдите снова")
return redirect(_reverse_first("login", "ui:login"))
except ApiError as e:
if e.status == 404:
raise Http404("Пользователь не найден")
messages.error(request, str(e))
return redirect(_reverse_first("profiles", "ui:profiles"))
stub = _user_to_profile_stub(user)
liked_ids = set(request.session.get("likes", []))
liked = stub["id"] in liked_ids
return render(request, "ui/profile_detail.html", {"profile": stub, "liked": liked})
@require_POST
def like_profile(request, pk):
likes = set(request.session.get("likes", []))
pk_str = str(pk)
if pk_str in likes:
likes.remove(pk_str); liked = False
else:
likes.add(pk_str); liked = True
request.session["likes"] = list(likes)
request.session.modified = True
return render(request, "ui/components/like_button.html", {"profile_id": pk_str, "liked": liked})
# === auth =====================================================================
@require_http_methods(["GET", "POST"])
def login_view(request):
if request.method == "POST":
email = (request.POST.get("email") or "").strip()
password = (request.POST.get("password") or "").strip()
if not email or not password:
messages.error(request, "Укажите email и пароль")
else:
try:
tokens = api.login(request, email, password)
me = api.get_me(request)
_set_tokens_and_user(request, tokens, me)
next_url = request.GET.get("next")
if not next_url:
if (me.get("role") or "").upper() == "ADMIN":
next_url = _reverse_first("profiles", "ui:profiles")
else:
next_url = _reverse_first("cabinet", "ui:cabinet")
resp = redirect(next_url)
max_age = 7 * 24 * 3600
resp.set_cookie("access_token", tokens.get("access_token"), max_age=max_age, httponly=True, samesite="Lax")
resp.set_cookie("refresh_token", tokens.get("refresh_token"), max_age=max_age, httponly=True, samesite="Lax")
messages.success(request, "Вы успешно вошли")
return resp
except PermissionDenied as e:
messages.error(request, f"Доступ запрещён: {e}")
except ApiError as e:
messages.error(request, f"Ошибка входа: {e.args[0]}")
return render(request, "ui/login.html", {})
@require_http_methods(["GET", "POST"])
def register_view(request):
if request.method == "POST":
email = (request.POST.get("email") or "").strip()
password = (request.POST.get("password") or "").strip()
full_name = (request.POST.get("full_name") or "").strip() or None
if not email or not password:
messages.error(request, "Укажите email и пароль")
else:
try:
api.register_user(request, email, password, full_name, role="CLIENT")
tokens = api.login(request, email, password)
me = api.get_me(request)
_set_tokens_and_user(request, tokens, me)
resp = redirect(_reverse_first("cabinet", "ui:cabinet"))
max_age = 7 * 24 * 3600
resp.set_cookie("access_token", tokens.get("access_token"), max_age=max_age, httponly=True, samesite="Lax")
resp.set_cookie("refresh_token", tokens.get("refresh_token"), max_age=max_age, httponly=True, samesite="Lax")
messages.success(request, "Регистрация успешна")
return resp
except ApiError as e:
messages.error(request, f"Ошибка регистрации: {e.args[0]}")
return render(request, "ui/register.html", {})
@require_http_methods(["POST", "GET"])
def logout_view(request):
resp = redirect(_reverse_first("index", "ui:index"))
resp.delete_cookie("access_token")
resp.delete_cookie("refresh_token")
for k in ("auth", "access_token", "refresh_token", "user_id", "user_email", "user_full_name", "user_role"):
request.session.pop(k, None)
request.session.modified = True
messages.info(request, "Вы вышли из аккаунта")
return resp
# === cabinet ==================================================================
@require_http_methods(["GET", "POST"])
def cabinet_view(request):
if not _is_logged(request):
messages.info(request, "Войдите, чтобы открыть кабинет")
return redirect(_reverse_first("login", "ui:login"))
profile = None
has_profile = False
try:
profile = api.get_my_profile(request)
has_profile = bool(profile)
except PermissionDenied:
messages.error(request, "Сессия истекла, войдите снова")
return redirect(_reverse_first("login", "ui:login"))
except ApiError as e:
if e.status != 404:
messages.error(request, f"Ошибка чтения профиля: {e}")
if request.method == "POST":
action = (request.POST.get("action") or "").strip()
try:
if action == "create_profile":
gender = (request.POST.get("gender") or "").strip()
city = (request.POST.get("city") or "").strip()
languages = [s.strip() for s in (request.POST.get("languages") or "").split(",") if s.strip()]
interests = [s.strip() for s in (request.POST.get("interests") or "").split(",") if s.strip()]
api.create_my_profile(request, gender, city, languages, interests)
messages.success(request, "Профиль создан")
return redirect(_reverse_first("cabinet", "ui:cabinet"))
elif action == "update_name":
full_name = (request.POST.get("full_name") or "").strip()
if not full_name:
messages.error(request, "Имя не может быть пустым")
else:
api.update_user_me(request, request.session.get("user_id"), full_name=full_name)
request.session["user_full_name"] = full_name
request.session.modified = True
messages.success(request, "Имя обновлено")
return redirect(_reverse_first("cabinet", "ui:cabinet"))
except PermissionDenied:
messages.error(request, "Сессия истекла, войдите снова")
return redirect(_reverse_first("login", "ui:login"))
except ApiError as e:
messages.error(request, f"Ошибка: {e}")
ctx = {
"has_profile": has_profile,
"profile": profile,
}
return render(request, "ui/cabinet.html", ctx)
@require_POST
def cabinet_upload_photo(request):
# Требуем авторизацию
if not (request.COOKIES.get('access_token') or request.session.get('access_token') or (request.session.get('auth') or {}).get('access_token')):
messages.error(request, "Сначала войдите")
return redirect('ui:login')
file_obj = request.FILES.get('file') or request.FILES.get('photo')
if not file_obj:
messages.error(request, "Не выбрано фото")
return redirect('ui:cabinet')
try:
prof = api.upload_my_profile_photo(request, file_obj)
# Обновлённый профиль приходит от API (в т.ч. photo_url)
messages.success(request, "Фото успешно обновлено")
except PermissionDenied:
messages.error(request, "Сессия истекла, войдите снова")
return redirect('ui:login')
except ApiError as e:
messages.error(request, f"Не удалось загрузить фото: {e.payload.get('detail') or e.args[0]}")
return redirect('ui:cabinet')
@require_POST
def cabinet_delete_photo(request):
if not _is_logged(request):
messages.info(request, "Войдите, чтобы удалить фото")
return redirect(_reverse_first("login", "ui:login"))
try:
api.delete_my_photo(request)
messages.success(request, "Фото удалено")
except PermissionDenied:
messages.error(request, "Сессия истекла, войдите снова")
return redirect(_reverse_first("login", "ui:login"))
except ApiError as e:
if e.status in (404, 405):
messages.error(request, "Удаление фото не поддерживается бэкендом.")
else:
messages.error(request, f"Ошибка удаления: {e}")
return redirect(_reverse_first("cabinet", "ui:cabinet"))