from __future__ import annotations from dataclasses import dataclass from typing import Optional, List from telegram import ( Update, InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto, ) from telegram.ext import ( ContextTypes, CommandHandler, CallbackQueryHandler, ConversationHandler, MessageHandler, filters, ) from sqlalchemy import desc from app.core.db import db_session from app.core.i18n import t from app.repositories.admin_repo import AdminRepository from app.repositories.candidate_repo import CandidateRepository from app.models.candidate import Candidate from app.utils.common import csv_to_list PAGE_SIZE = 5 # можно менять # ========================== Утилиты отрисовки =============================== def _caption(c: Candidate) -> str: return t( "cand.caption", id=c.id, name=(c.full_name or "—"), username=(c.username or ""), city=(c.city or "—"), goal=(c.goal or "—"), verified=(t("cand.verified_yes") if c.is_verified else t("cand.verified_no")), active=(t("cand.active_yes") if c.is_active else t("cand.active_no")), ) def cand_kb(c: Candidate) -> InlineKeyboardMarkup: buttons = [ [ InlineKeyboardButton(t("btn.open"), callback_data=f"cand:view:{c.id}"), InlineKeyboardButton(t("btn.verify"), callback_data=f"cand:verify:{c.id}"), ], [ InlineKeyboardButton( t("btn.hide") if c.is_active else t("btn.restore"), callback_data=f"cand:toggle:{c.id}", ), ], ] return InlineKeyboardMarkup(buttons) def pager_kb(prefix: str, page: int, has_prev: bool, has_next: bool) -> InlineKeyboardMarkup | None: # prefix: "candlist" или "listres" btns = [] row = [] if has_prev: row.append(InlineKeyboardButton(t("btn.prev"), callback_data=f"{prefix}:page:{page-1}")) if has_next: row.append(InlineKeyboardButton(t("btn.next"), callback_data=f"{prefix}:page:{page+1}")) if row: btns.append(row) return InlineKeyboardMarkup(btns) return None # =================== /candidates с пагинацией =============================== async def list_candidates_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): with db_session() as s: if not AdminRepository(s).is_admin(update.effective_user.id): await update.message.reply_text(t("err.not_allowed")) return page = 0 context.user_data["candlist_page"] = page total = s.query(Candidate).count() rows = (s.query(Candidate) .order_by(desc(Candidate.created_at)) .offset(page * PAGE_SIZE).limit(PAGE_SIZE).all()) if not rows: await update.message.reply_text(t("list.empty")) return for c in rows: if c.avatar_file_id: await update.message.reply_photo(photo=c.avatar_file_id, caption=_caption(c), reply_markup=cand_kb(c)) else: await update.message.reply_text(text=_caption(c), reply_markup=cand_kb(c)) kb = pager_kb("candlist", page, has_prev=False, has_next=(total > (page + 1) * PAGE_SIZE)) if kb: await update.message.reply_text(" ", reply_markup=kb) # отдельным сообщением панель пагинации async def candlist_pager(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() _, _, page_str = q.data.split(":") # candlist:page: page = max(0, int(page_str)) context.user_data["candlist_page"] = page with db_session() as s: if not AdminRepository(s).is_admin(update.effective_user.id): await q.edit_message_text(t("err.not_allowed")) return total = s.query(Candidate).count() rows = (s.query(Candidate) .order_by(desc(Candidate.created_at)) .offset(page * PAGE_SIZE).limit(PAGE_SIZE).all()) # Обновляем "панель" пагинации (это отдельное сообщение) kb = pager_kb("candlist", page, has_prev=(page > 0), has_next=(total > (page + 1) * PAGE_SIZE)) try: await q.edit_message_reply_markup(kb) except Exception: # если не панель, а текст — просто обновим текст await q.edit_message_text(" ", reply_markup=kb) # а список карточек отправляем следующими сообщениями chat_id = q.message.chat.id for c in rows: if c.avatar_file_id: await q.bot.send_photo(chat_id=chat_id, photo=c.avatar_file_id, caption=_caption(c), reply_markup=cand_kb(c)) else: await q.bot.send_message(chat_id=chat_id, text=_caption(c), reply_markup=cand_kb(c)) # ======================== Кнопки карточек =================================== async def callback_router(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() data = q.data or "" if not data.startswith("cand:"): return _, action, id_str = data.split(":") cid = int(id_str) with db_session() as s: if not AdminRepository(s).is_admin(update.effective_user.id): if q.message.photo: await q.edit_message_caption(caption=t("err.not_allowed")) else: await q.edit_message_text(t("err.not_allowed")) return c = s.get(Candidate, cid) if not c: if q.message.photo: await q.edit_message_caption(caption=t("err.not_found")) else: await q.edit_message_text(t("err.not_found")) return if action == "view": cap = _caption(c) if q.message.photo: await q.edit_message_caption(caption=cap, reply_markup=cand_kb(c)) else: await q.edit_message_text(text=cap, reply_markup=cand_kb(c)) return if action == "verify": c.is_verified = True s.commit() cap = _caption(c) if q.message.photo: await q.edit_message_caption(caption=cap, reply_markup=cand_kb(c)) else: await q.edit_message_text(text=cap, reply_markup=cand_kb(c)) return if action == "toggle": c.is_active = not c.is_active s.commit() cap = _caption(c) if q.message.photo: await q.edit_message_caption(caption=cap, reply_markup=cand_kb(c)) else: await q.edit_message_text(text=cap, reply_markup=cand_kb(c)) return # ============================= /view =================================== async def view_candidate_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): with db_session() as s: if not AdminRepository(s).is_admin(update.effective_user.id): await update.message.reply_text(t("err.not_allowed")) return args = context.args or [] if not args: await update.message.reply_text("Формат: /view 12") return try: cid = int(args[0]) except ValueError: await update.message.reply_text("Укажи числовой id анкеты.") return c = s.get(Candidate, cid) if not c: await update.message.reply_text(t("err.not_found")) return caption = _caption(c) chat_id = update.effective_chat.id if c.avatar_file_id: await context.bot.send_photo(chat_id=chat_id, photo=c.avatar_file_id, caption=caption, reply_markup=cand_kb(c)) else: await context.bot.send_message(chat_id=chat_id, text=caption, reply_markup=cand_kb(c)) gallery_ids = csv_to_list(c.gallery_file_ids) if gallery_ids: batch: List[InputMediaPhoto] = [] for fid in gallery_ids: batch.append(InputMediaPhoto(media=fid)) if len(batch) == 10: await context.bot.send_media_group(chat_id=chat_id, media=batch) batch = [] if batch: await context.bot.send_media_group(chat_id=chat_id, media=batch) # ===================== /list — фильтры + пагинация ========================== F_GENDER, F_CITY, F_VERIFIED, F_ACTIVE = range(4) @dataclass class ListFilters: gender: Optional[str] = None city: Optional[str] = None verified: Optional[bool] = None active: Optional[bool] = True def _kb_gender() -> InlineKeyboardMarkup: return InlineKeyboardMarkup([ [InlineKeyboardButton(t("list.gender_f"), callback_data="list:gender:f"), InlineKeyboardButton(t("list.gender_m"), callback_data="list:gender:m")], [InlineKeyboardButton(t("list.skip"), callback_data="list:gender:skip")], ]) def _kb_yesno(prefix: str) -> InlineKeyboardMarkup: return InlineKeyboardMarkup([ [InlineKeyboardButton("Да", callback_data=f"list:{prefix}:yes"), InlineKeyboardButton("Нет", callback_data=f"list:{prefix}:no")], [InlineKeyboardButton(t("list.skip"), callback_data=f"list:{prefix}:skip")], ]) async def list_start(update: Update, context: ContextTypes.DEFAULT_TYPE): with db_session() as s: if not AdminRepository(s).is_admin(update.effective_user.id): await update.message.reply_text(t("err.not_allowed")) return ConversationHandler.END context.user_data["list_filters"] = ListFilters() context.user_data["list_page"] = 0 await update.message.reply_text(t("list.start_gender"), reply_markup=_kb_gender()) return F_GENDER async def list_gender(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() _, _, val = q.data.split(":") # list:gender: lf: ListFilters = context.user_data.get("list_filters") # type: ignore if val == "f": lf.gender = t("list.gender_f") elif val == "m": lf.gender = t("list.gender_m") await q.edit_message_text(t("list.ask_city"), reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(t("list.skip"), callback_data="list:city:skip")]])) return F_CITY async def list_city(update: Update, context: ContextTypes.DEFAULT_TYPE): lf: ListFilters = context.user_data.get("list_filters") # type: ignore if update.callback_query: await update.callback_query.answer() await update.callback_query.edit_message_text(t("list.ask_verified")) await update.callback_query.message.reply_text(t("list.ask_verified"), reply_markup=_kb_yesno("verified")) return F_VERIFIED city = (update.message.text or "").strip() if city.lower() != "пропустить": lf.city = city await update.message.reply_text(t("list.ask_verified"), reply_markup=_kb_yesno("verified")) return F_VERIFIED async def list_verified(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() _, _, val = q.data.split(":") lf: ListFilters = context.user_data.get("list_filters") # type: ignore if val == "yes": lf.verified = True elif val == "no": lf.verified = False await q.edit_message_text(t("list.ask_active")) await q.message.reply_text(t("list.ask_active"), reply_markup=_kb_yesno("active")) return F_ACTIVE async def list_active(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() _, _, val = q.data.split(":") lf: ListFilters = context.user_data.get("list_filters") # type: ignore if val == "yes": lf.active = True elif val == "no": lf.active = False context.user_data["list_page"] = 0 await q.edit_message_text(t("list.searching")) await _list_show_results(update, context) return ConversationHandler.END async def _list_show_results(update: Update, context: ContextTypes.DEFAULT_TYPE, page: Optional[int] = None): lf: ListFilters = context.user_data.get("list_filters") # type: ignore page = context.user_data.get("list_page", 0) if page is None else page with db_session() as s: if not AdminRepository(s).is_admin(update.effective_user.id): await context.bot.send_message(chat_id=update.effective_chat.id, text=t("err.not_allowed")) return q = s.query(Candidate) if lf.gender: q = q.filter(Candidate.gender == lf.gender) if lf.city: q = q.filter(Candidate.city.ilike(lf.city)) if lf.verified is not None: q = q.filter(Candidate.is_verified.is_(lf.verified)) if lf.active is not None: q = q.filter(Candidate.is_active.is_(lf.active)) total = q.count() rows = q.order_by(desc(Candidate.created_at)).offset(page * PAGE_SIZE).limit(PAGE_SIZE).all() if not rows: await context.bot.send_message(chat_id=update.effective_chat.id, text=t("list.empty")) return # Отправим карточки for c in rows: cap = _caption(c) if c.avatar_file_id: await context.bot.send_photo(chat_id=update.effective_chat.id, photo=c.avatar_file_id, caption=cap, reply_markup=cand_kb(c)) else: await context.bot.send_message(chat_id=update.effective_chat.id, text=cap, reply_markup=cand_kb(c)) # Панель пагинации результатов /list kb = pager_kb("listres", page, has_prev=(page > 0), has_next=(total > (page + 1) * PAGE_SIZE)) if kb: await context.bot.send_message(chat_id=update.effective_chat.id, text=" ", reply_markup=kb) async def list_results_pager(update: Update, context: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() _, _, page_str = q.data.split(":") # listres:page: page = max(0, int(page_str)) context.user_data["list_page"] = page # просто обновляем "панель" и заново выводим карточки try: await q.edit_message_reply_markup(reply_markup=None) # убираем старую панель (по возможности) except Exception: pass await _list_show_results(update, context)