# app/bot/handlers/dict_commands.py from telegram import Update from telegram.constants import ChatType from telegram.ext import ContextTypes from sqlalchemy import func from app.db.session import get_session from app.db.models import ( User, SecurityPolicy, ChatSecurity, SpamDictionary, PolicyDictionaryLink, DictionaryEntry, ) ALLOWED_CATEGORIES = {"spam", "scam", "adult", "profanity", "custom"} def _get_or_create_policy(session, owner_tg_id: int) -> SecurityPolicy: u = session.query(User).filter_by(tg_id=owner_tg_id).first() if not u: u = User(tg_id=owner_tg_id, name="") session.add(u); session.commit(); session.refresh(u) p = session.query(SecurityPolicy).filter_by(owner_user_id=u.id, name="Balanced").first() if p: return p p = SecurityPolicy(owner_user_id=u.id, name="Balanced") session.add(p); session.commit(); session.refresh(p) return p async def _is_group_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> bool: """В группе/канале разрешаем только администраторам.""" chat = update.effective_chat if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL): try: m = await ctx.bot.get_chat_member(chat.id, update.effective_user.id) return m.status in ("administrator", "creator") except Exception: return False return True def _resolve_policy_and_owner(session, update: Update) -> tuple[SecurityPolicy, User, str]: """ В группе/канале: берём политику, привязанную к чату. В ЛС: дефолтную политику Balanced владельца. """ chat = update.effective_chat user = session.query(User).filter_by(tg_id=update.effective_user.id).first() if not user: user = User(tg_id=update.effective_user.id, name=update.effective_user.full_name or "") session.add(user); session.commit(); session.refresh(user) if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL): cs = session.query(ChatSecurity).filter_by(chat_id=chat.id).first() if not cs: raise RuntimeError("not_bound") p = session.get(SecurityPolicy, cs.policy_id) if not p: raise RuntimeError("policy_missing") return p, user, f"политика чата «{chat.title or chat.id}»" else: p = _get_or_create_policy(session, update.effective_user.id) return p, user, "ваша политика Balanced (ЛС)" def _find_dict(session, owner: User, token: str) -> SpamDictionary | None: """token — numeric id или точное имя словаря владельца (без глобальных).""" token = (token or "").strip() if not token: return None if token.isdigit(): d = session.get(SpamDictionary, int(token)) return d if d and d.owner_user_id == owner.id else None return ( session.query(SpamDictionary) .filter(SpamDictionary.owner_user_id == owner.id, SpamDictionary.name == token) .first() ) # ===================== Команды ===================== async def dicts_link_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not await _is_group_admin(update, ctx): return args = ctx.args or [] if not args: await update.effective_message.reply_text("Использование: /dicts_link ") return token = " ".join(args).strip() with get_session() as s: try: p, owner, scope = _resolve_policy_and_owner(s, update) except RuntimeError as e: if str(e) == "not_bound": await update.effective_message.reply_text("Политика не привязана. В группе откройте /security → «Привязать к этому чату».") return await update.effective_message.reply_text("Политика недоступна. Привяжите её через /security.") return d = _find_dict(s, owner, token) if not d: await update.effective_message.reply_text(f"Словарь не найден среди ваших: «{token}». Проверьте id/имя (см. /dicts).") return link = s.query(PolicyDictionaryLink).filter_by(policy_id=p.id, dictionary_id=d.id).first() if link: await update.effective_message.reply_text(f"Уже привязан: «{d.name}» ({scope}).") return s.add(PolicyDictionaryLink(policy_id=p.id, dictionary_id=d.id)); s.commit(); from app.moderation.engine import dict_cache dict_cache.invalidate(p.id) rules = s.query(DictionaryEntry).filter_by(dictionary_id=d.id).count() await update.effective_message.reply_text(f"Привязал «{d.name}» ({d.category}/{d.kind}, правил: {rules}) к {scope}.") async def dicts_unlink_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not await _is_group_admin(update, ctx): return args = ctx.args or [] if not args: await update.effective_message.reply_text("Использование: /dicts_unlink ") return token = " ".join(args).strip() with get_session() as s: try: p, owner, scope = _resolve_policy_and_owner(s, update) except RuntimeError: await update.effective_message.reply_text("Политика не привязана. В группе откройте /security → «Привязать к этому чату».") return d = _find_dict(s, owner, token) if not d: await update.effective_message.reply_text(f"Словарь не найден: «{token}».") return link = s.query(PolicyDictionaryLink).filter_by(policy_id=p.id, dictionary_id=d.id).first() if not link: await update.effective_message.reply_text(f"Словарь «{d.name}» и так не привязан к {scope}.") return s.delete(link); s.commit() from app.moderation.engine import dict_cache dict_cache.invalidate(p.id) await update.effective_message.reply_text(f"Отвязал «{d.name}» от {scope}.") async def dicts_link_all_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not await _is_group_admin(update, ctx): return args = ctx.args or [] cat = (args[0].lower() if args else None) if cat and cat not in ALLOWED_CATEGORIES: await update.effective_message.reply_text(f"Категория неизвестна. Используйте: {', '.join(sorted(ALLOWED_CATEGORIES))} (или без аргумента).") return with get_session() as s: try: p, owner, scope = _resolve_policy_and_owner(s, update) except RuntimeError: await update.effective_message.reply_text("Политика не привязана. В группе откройте /security → «Привязать к этому чату».") return q = s.query(SpamDictionary).filter_by(owner_user_id=owner.id) if cat: q = q.filter(SpamDictionary.category == cat) ds = q.all() if not ds: await update.effective_message.reply_text("У вас нет словарей для привязки (см. /spam_import).") return existing = {x.dictionary_id for x in s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all()} added = 0 for d in ds: if d.id in existing: continue s.add(PolicyDictionaryLink(policy_id=p.id, dictionary_id=d.id)); added += 1 s.commit() from app.moderation.engine import dict_cache dict_cache.invalidate(p.id) await update.effective_message.reply_text(f"Привязал {added} словарей к {scope}." + (f" (категория: {cat})" if cat else "")) async def dicts_unlink_all_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not await _is_group_admin(update, ctx): return args = ctx.args or [] cat = (args[0].lower() if args else None) if cat and cat not in ALLOWED_CATEGORIES: await update.effective_message.reply_text(f"Категория неизвестна. Используйте: {', '.join(sorted(ALLOWED_CATEGORIES))} (или без аргумента).") return with get_session() as s: try: p, owner, scope = _resolve_policy_and_owner(s, update) except RuntimeError: await update.effective_message.reply_text("Политика не привязана. В группе откройте /security → «Привязать к этому чату».") return q = ( s.query(SpamDictionary) .join(PolicyDictionaryLink, PolicyDictionaryLink.dictionary_id == SpamDictionary.id) .filter(PolicyDictionaryLink.policy_id == p.id, SpamDictionary.owner_user_id == owner.id) ) if cat: q = q.filter(SpamDictionary.category == cat) ds = q.all() if not ds: await update.effective_message.reply_text("Нет привязанных словарей для отвязки.") return deleted = 0 for d in ds: link = s.query(PolicyDictionaryLink).filter_by(policy_id=p.id, dictionary_id=d.id).first() if link: s.delete(link); deleted += 1 s.commit() from app.moderation.engine import dict_cache dict_cache.invalidate(p.id) await update.effective_message.reply_text(f"Отвязал {deleted} словарей от {scope}." + (f" (категория: {cat})" if cat else "")) async def dicts_linked_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not await _is_group_admin(update, ctx): return with get_session() as s: try: p, owner, scope = _resolve_policy_and_owner(s, update) except RuntimeError: await update.effective_message.reply_text("Политика не привязана. В группе откройте /security → «Привязать к этому чату».") return links = s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all() if not links: await update.effective_message.reply_text(f"К {scope} пока ничего не привязано.") return d_ids = [lnk.dictionary_id for lnk in links] dicts = s.query(SpamDictionary).filter(SpamDictionary.id.in_(d_ids)).all() counts = { d.id: s.query(func.count(DictionaryEntry.id)).filter(DictionaryEntry.dictionary_id == d.id).scalar() or 0 for d in dicts } lines = [f"Привязано к {scope}:"] for d in dicts: lines.append(f"• [{d.id}] {d.name} ({d.category}/{d.kind}) — правил: {counts.get(d.id, 0)}") await update.effective_message.reply_text("\n".join(lines))