Files
Andrey K. Choi 0729e9196b
Some checks reported errors
continuous-integration/drone/push Build encountered an error
i18n, messages are in templates
2025-08-13 05:21:25 +09:00

372 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, List
from telegram import (
Update,
InlineKeyboardMarkup,
InlineKeyboardButton,
InputMediaPhoto,
)
from telegram.ext import (
ContextTypes,
CommandHandler,
CallbackQueryHandler,
ConversationHandler,
MessageHandler,
filters,
)
from sqlalchemy import desc
from app.core.db import db_session
from app.core.i18n import t
from app.repositories.admin_repo import AdminRepository
from app.repositories.candidate_repo import CandidateRepository
from app.models.candidate import Candidate
from app.utils.common import csv_to_list
PAGE_SIZE = 5 # можно менять
# ========================== Утилиты отрисовки ===============================
def _caption(c: Candidate) -> str:
return t(
"cand.caption",
id=c.id,
name=(c.full_name or ""),
username=(c.username or ""),
city=(c.city or ""),
goal=(c.goal or ""),
verified=(t("cand.verified_yes") if c.is_verified else t("cand.verified_no")),
active=(t("cand.active_yes") if c.is_active else t("cand.active_no")),
)
def cand_kb(c: Candidate) -> InlineKeyboardMarkup:
buttons = [
[
InlineKeyboardButton(t("btn.open"), callback_data=f"cand:view:{c.id}"),
InlineKeyboardButton(t("btn.verify"), callback_data=f"cand:verify:{c.id}"),
],
[
InlineKeyboardButton(
t("btn.hide") if c.is_active else t("btn.restore"),
callback_data=f"cand:toggle:{c.id}",
),
],
]
return InlineKeyboardMarkup(buttons)
def pager_kb(prefix: str, page: int, has_prev: bool, has_next: bool) -> InlineKeyboardMarkup | None:
# prefix: "candlist" или "listres"
btns = []
row = []
if has_prev:
row.append(InlineKeyboardButton(t("btn.prev"), callback_data=f"{prefix}:page:{page-1}"))
if has_next:
row.append(InlineKeyboardButton(t("btn.next"), callback_data=f"{prefix}:page:{page+1}"))
if row:
btns.append(row)
return InlineKeyboardMarkup(btns)
return None
# =================== /candidates с пагинацией ===============================
async def list_candidates_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
with db_session() as s:
if not AdminRepository(s).is_admin(update.effective_user.id):
await update.message.reply_text(t("err.not_allowed"))
return
page = 0
context.user_data["candlist_page"] = page
total = s.query(Candidate).count()
rows = (s.query(Candidate)
.order_by(desc(Candidate.created_at))
.offset(page * PAGE_SIZE).limit(PAGE_SIZE).all())
if not rows:
await update.message.reply_text(t("list.empty"))
return
for c in rows:
if c.avatar_file_id:
await update.message.reply_photo(photo=c.avatar_file_id, caption=_caption(c), reply_markup=cand_kb(c))
else:
await update.message.reply_text(text=_caption(c), reply_markup=cand_kb(c))
kb = pager_kb("candlist", page, has_prev=False, has_next=(total > (page + 1) * PAGE_SIZE))
if kb:
await update.message.reply_text(" ", reply_markup=kb) # отдельным сообщением панель пагинации
async def candlist_pager(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
_, _, page_str = q.data.split(":") # candlist:page:<n>
page = max(0, int(page_str))
context.user_data["candlist_page"] = page
with db_session() as s:
if not AdminRepository(s).is_admin(update.effective_user.id):
await q.edit_message_text(t("err.not_allowed"))
return
total = s.query(Candidate).count()
rows = (s.query(Candidate)
.order_by(desc(Candidate.created_at))
.offset(page * PAGE_SIZE).limit(PAGE_SIZE).all())
# Обновляем "панель" пагинации (это отдельное сообщение)
kb = pager_kb("candlist", page, has_prev=(page > 0), has_next=(total > (page + 1) * PAGE_SIZE))
try:
await q.edit_message_reply_markup(kb)
except Exception:
# если не панель, а текст — просто обновим текст
await q.edit_message_text(" ", reply_markup=kb)
# а список карточек отправляем следующими сообщениями
chat_id = q.message.chat.id
for c in rows:
if c.avatar_file_id:
await q.bot.send_photo(chat_id=chat_id, photo=c.avatar_file_id, caption=_caption(c), reply_markup=cand_kb(c))
else:
await q.bot.send_message(chat_id=chat_id, text=_caption(c), reply_markup=cand_kb(c))
# ======================== Кнопки карточек ===================================
async def callback_router(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
data = q.data or ""
if not data.startswith("cand:"):
return
_, action, id_str = data.split(":")
cid = int(id_str)
with db_session() as s:
if not AdminRepository(s).is_admin(update.effective_user.id):
if q.message.photo:
await q.edit_message_caption(caption=t("err.not_allowed"))
else:
await q.edit_message_text(t("err.not_allowed"))
return
c = s.get(Candidate, cid)
if not c:
if q.message.photo:
await q.edit_message_caption(caption=t("err.not_found"))
else:
await q.edit_message_text(t("err.not_found"))
return
if action == "view":
cap = _caption(c)
if q.message.photo:
await q.edit_message_caption(caption=cap, reply_markup=cand_kb(c))
else:
await q.edit_message_text(text=cap, reply_markup=cand_kb(c))
return
if action == "verify":
c.is_verified = True
s.commit()
cap = _caption(c)
if q.message.photo:
await q.edit_message_caption(caption=cap, reply_markup=cand_kb(c))
else:
await q.edit_message_text(text=cap, reply_markup=cand_kb(c))
return
if action == "toggle":
c.is_active = not c.is_active
s.commit()
cap = _caption(c)
if q.message.photo:
await q.edit_message_caption(caption=cap, reply_markup=cand_kb(c))
else:
await q.edit_message_text(text=cap, reply_markup=cand_kb(c))
return
# ============================= /view <id> ===================================
async def view_candidate_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
with db_session() as s:
if not AdminRepository(s).is_admin(update.effective_user.id):
await update.message.reply_text(t("err.not_allowed"))
return
args = context.args or []
if not args:
await update.message.reply_text("Формат: /view 12")
return
try:
cid = int(args[0])
except ValueError:
await update.message.reply_text("Укажи числовой id анкеты.")
return
c = s.get(Candidate, cid)
if not c:
await update.message.reply_text(t("err.not_found"))
return
caption = _caption(c)
chat_id = update.effective_chat.id
if c.avatar_file_id:
await context.bot.send_photo(chat_id=chat_id, photo=c.avatar_file_id, caption=caption, reply_markup=cand_kb(c))
else:
await context.bot.send_message(chat_id=chat_id, text=caption, reply_markup=cand_kb(c))
gallery_ids = csv_to_list(c.gallery_file_ids)
if gallery_ids:
batch: List[InputMediaPhoto] = []
for fid in gallery_ids:
batch.append(InputMediaPhoto(media=fid))
if len(batch) == 10:
await context.bot.send_media_group(chat_id=chat_id, media=batch)
batch = []
if batch:
await context.bot.send_media_group(chat_id=chat_id, media=batch)
# ===================== /list — фильтры + пагинация ==========================
F_GENDER, F_CITY, F_VERIFIED, F_ACTIVE = range(4)
@dataclass
class ListFilters:
gender: Optional[str] = None
city: Optional[str] = None
verified: Optional[bool] = None
active: Optional[bool] = True
def _kb_gender() -> InlineKeyboardMarkup:
return InlineKeyboardMarkup([
[InlineKeyboardButton(t("list.gender_f"), callback_data="list:gender:f"),
InlineKeyboardButton(t("list.gender_m"), callback_data="list:gender:m")],
[InlineKeyboardButton(t("list.skip"), callback_data="list:gender:skip")],
])
def _kb_yesno(prefix: str) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup([
[InlineKeyboardButton("Да", callback_data=f"list:{prefix}:yes"),
InlineKeyboardButton("Нет", callback_data=f"list:{prefix}:no")],
[InlineKeyboardButton(t("list.skip"), callback_data=f"list:{prefix}:skip")],
])
async def list_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
with db_session() as s:
if not AdminRepository(s).is_admin(update.effective_user.id):
await update.message.reply_text(t("err.not_allowed"))
return ConversationHandler.END
context.user_data["list_filters"] = ListFilters()
context.user_data["list_page"] = 0
await update.message.reply_text(t("list.start_gender"), reply_markup=_kb_gender())
return F_GENDER
async def list_gender(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
_, _, val = q.data.split(":") # list:gender:<val>
lf: ListFilters = context.user_data.get("list_filters") # type: ignore
if val == "f":
lf.gender = t("list.gender_f")
elif val == "m":
lf.gender = t("list.gender_m")
await q.edit_message_text(t("list.ask_city"),
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(t("list.skip"), callback_data="list:city:skip")]]))
return F_CITY
async def list_city(update: Update, context: ContextTypes.DEFAULT_TYPE):
lf: ListFilters = context.user_data.get("list_filters") # type: ignore
if update.callback_query:
await update.callback_query.answer()
await update.callback_query.edit_message_text(t("list.ask_verified"))
await update.callback_query.message.reply_text(t("list.ask_verified"), reply_markup=_kb_yesno("verified"))
return F_VERIFIED
city = (update.message.text or "").strip()
if city.lower() != "пропустить":
lf.city = city
await update.message.reply_text(t("list.ask_verified"), reply_markup=_kb_yesno("verified"))
return F_VERIFIED
async def list_verified(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
_, _, val = q.data.split(":")
lf: ListFilters = context.user_data.get("list_filters") # type: ignore
if val == "yes":
lf.verified = True
elif val == "no":
lf.verified = False
await q.edit_message_text(t("list.ask_active"))
await q.message.reply_text(t("list.ask_active"), reply_markup=_kb_yesno("active"))
return F_ACTIVE
async def list_active(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
_, _, val = q.data.split(":")
lf: ListFilters = context.user_data.get("list_filters") # type: ignore
if val == "yes":
lf.active = True
elif val == "no":
lf.active = False
context.user_data["list_page"] = 0
await q.edit_message_text(t("list.searching"))
await _list_show_results(update, context)
return ConversationHandler.END
async def _list_show_results(update: Update, context: ContextTypes.DEFAULT_TYPE, page: Optional[int] = None):
lf: ListFilters = context.user_data.get("list_filters") # type: ignore
page = context.user_data.get("list_page", 0) if page is None else page
with db_session() as s:
if not AdminRepository(s).is_admin(update.effective_user.id):
await context.bot.send_message(chat_id=update.effective_chat.id, text=t("err.not_allowed"))
return
q = s.query(Candidate)
if lf.gender:
q = q.filter(Candidate.gender == lf.gender)
if lf.city:
q = q.filter(Candidate.city.ilike(lf.city))
if lf.verified is not None:
q = q.filter(Candidate.is_verified.is_(lf.verified))
if lf.active is not None:
q = q.filter(Candidate.is_active.is_(lf.active))
total = q.count()
rows = q.order_by(desc(Candidate.created_at)).offset(page * PAGE_SIZE).limit(PAGE_SIZE).all()
if not rows:
await context.bot.send_message(chat_id=update.effective_chat.id, text=t("list.empty"))
return
# Отправим карточки
for c in rows:
cap = _caption(c)
if c.avatar_file_id:
await context.bot.send_photo(chat_id=update.effective_chat.id, photo=c.avatar_file_id, caption=cap, reply_markup=cand_kb(c))
else:
await context.bot.send_message(chat_id=update.effective_chat.id, text=cap, reply_markup=cand_kb(c))
# Панель пагинации результатов /list
kb = pager_kb("listres", page, has_prev=(page > 0), has_next=(total > (page + 1) * PAGE_SIZE))
if kb:
await context.bot.send_message(chat_id=update.effective_chat.id, text=" ", reply_markup=kb)
async def list_results_pager(update: Update, context: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
_, _, page_str = q.data.split(":") # listres:page:<n>
page = max(0, int(page_str))
context.user_data["list_page"] = page
# просто обновляем "панель" и заново выводим карточки
try:
await q.edit_message_reply_markup(reply_markup=None) # убираем старую панель (по возможности)
except Exception:
pass
await _list_show_results(update, context)