Files
tg_post_min/app/bot/handlers/dict_commands.py
2025-08-28 04:28:37 +09:00

250 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/bot/handlers/dict_commands.py
from telegram import Update
from telegram.constants import ChatType
from telegram.ext import ContextTypes
from sqlalchemy import func
from app.db.session import get_session
from app.db.models import (
User, SecurityPolicy, ChatSecurity,
SpamDictionary, PolicyDictionaryLink, DictionaryEntry,
)
ALLOWED_CATEGORIES = {"spam", "scam", "adult", "profanity", "custom"}
def _get_or_create_policy(session, owner_tg_id: int) -> SecurityPolicy:
u = session.query(User).filter_by(tg_id=owner_tg_id).first()
if not u:
u = User(tg_id=owner_tg_id, name="")
session.add(u); session.commit(); session.refresh(u)
p = session.query(SecurityPolicy).filter_by(owner_user_id=u.id, name="Balanced").first()
if p:
return p
p = SecurityPolicy(owner_user_id=u.id, name="Balanced")
session.add(p); session.commit(); session.refresh(p)
return p
async def _is_group_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> bool:
"""В группе/канале разрешаем только администраторам."""
chat = update.effective_chat
if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
try:
m = await ctx.bot.get_chat_member(chat.id, update.effective_user.id)
return m.status in ("administrator", "creator")
except Exception:
return False
return True
def _resolve_policy_and_owner(session, update: Update) -> tuple[SecurityPolicy, User, str]:
"""
В группе/канале: берём политику, привязанную к чату.
В ЛС: дефолтную политику Balanced владельца.
"""
chat = update.effective_chat
user = session.query(User).filter_by(tg_id=update.effective_user.id).first()
if not user:
user = User(tg_id=update.effective_user.id, name=update.effective_user.full_name or "")
session.add(user); session.commit(); session.refresh(user)
if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
cs = session.query(ChatSecurity).filter_by(chat_id=chat.id).first()
if not cs:
raise RuntimeError("not_bound")
p = session.get(SecurityPolicy, cs.policy_id)
if not p:
raise RuntimeError("policy_missing")
return p, user, f"политика чата «{chat.title or chat.id}»"
else:
p = _get_or_create_policy(session, update.effective_user.id)
return p, user, "ваша политика Balanced (ЛС)"
def _find_dict(session, owner: User, token: str) -> SpamDictionary | None:
"""token — numeric id или точное имя словаря владельца (без глобальных)."""
token = (token or "").strip()
if not token:
return None
if token.isdigit():
d = session.get(SpamDictionary, int(token))
return d if d and d.owner_user_id == owner.id else None
return (
session.query(SpamDictionary)
.filter(SpamDictionary.owner_user_id == owner.id, SpamDictionary.name == token)
.first()
)
# ===================== Команды =====================
async def dicts_link_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not await _is_group_admin(update, ctx):
return
args = ctx.args or []
if not args:
await update.effective_message.reply_text("Использование: /dicts_link <id|имя_словаря>")
return
token = " ".join(args).strip()
with get_session() as s:
try:
p, owner, scope = _resolve_policy_and_owner(s, update)
except RuntimeError as e:
if str(e) == "not_bound":
await update.effective_message.reply_text("Политика не привязана. В группе откройте /security → «Привязать к этому чату».")
return
await update.effective_message.reply_text("Политика недоступна. Привяжите её через /security.")
return
d = _find_dict(s, owner, token)
if not d:
await update.effective_message.reply_text(f"Словарь не найден среди ваших: «{token}». Проверьте id/имя (см. /dicts).")
return
link = s.query(PolicyDictionaryLink).filter_by(policy_id=p.id, dictionary_id=d.id).first()
if link:
await update.effective_message.reply_text(f"Уже привязан: «{d.name}» ({scope}).")
return
s.add(PolicyDictionaryLink(policy_id=p.id, dictionary_id=d.id)); s.commit();
from app.moderation.engine import dict_cache
dict_cache.invalidate(p.id)
rules = s.query(DictionaryEntry).filter_by(dictionary_id=d.id).count()
await update.effective_message.reply_text(f"Привязал «{d.name}» ({d.category}/{d.kind}, правил: {rules}) к {scope}.")
async def dicts_unlink_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not await _is_group_admin(update, ctx):
return
args = ctx.args or []
if not args:
await update.effective_message.reply_text("Использование: /dicts_unlink <id|имя_словаря>")
return
token = " ".join(args).strip()
with get_session() as s:
try:
p, owner, scope = _resolve_policy_and_owner(s, update)
except RuntimeError:
await update.effective_message.reply_text("Политика не привязана. В группе откройте /security → «Привязать к этому чату».")
return
d = _find_dict(s, owner, token)
if not d:
await update.effective_message.reply_text(f"Словарь не найден: «{token}».")
return
link = s.query(PolicyDictionaryLink).filter_by(policy_id=p.id, dictionary_id=d.id).first()
if not link:
await update.effective_message.reply_text(f"Словарь «{d.name}» и так не привязан к {scope}.")
return
s.delete(link); s.commit()
from app.moderation.engine import dict_cache
dict_cache.invalidate(p.id)
await update.effective_message.reply_text(f"Отвязал «{d.name}» от {scope}.")
async def dicts_link_all_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not await _is_group_admin(update, ctx):
return
args = ctx.args or []
cat = (args[0].lower() if args else None)
if cat and cat not in ALLOWED_CATEGORIES:
await update.effective_message.reply_text(f"Категория неизвестна. Используйте: {', '.join(sorted(ALLOWED_CATEGORIES))} (или без аргумента).")
return
with get_session() as s:
try:
p, owner, scope = _resolve_policy_and_owner(s, update)
except RuntimeError:
await update.effective_message.reply_text("Политика не привязана. В группе откройте /security → «Привязать к этому чату».")
return
q = s.query(SpamDictionary).filter_by(owner_user_id=owner.id)
if cat:
q = q.filter(SpamDictionary.category == cat)
ds = q.all()
if not ds:
await update.effective_message.reply_text("У вас нет словарей для привязки (см. /spam_import).")
return
existing = {x.dictionary_id for x in s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all()}
added = 0
for d in ds:
if d.id in existing:
continue
s.add(PolicyDictionaryLink(policy_id=p.id, dictionary_id=d.id)); added += 1
s.commit()
from app.moderation.engine import dict_cache
dict_cache.invalidate(p.id)
await update.effective_message.reply_text(f"Привязал {added} словарей к {scope}." + (f" (категория: {cat})" if cat else ""))
async def dicts_unlink_all_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not await _is_group_admin(update, ctx):
return
args = ctx.args or []
cat = (args[0].lower() if args else None)
if cat and cat not in ALLOWED_CATEGORIES:
await update.effective_message.reply_text(f"Категория неизвестна. Используйте: {', '.join(sorted(ALLOWED_CATEGORIES))} (или без аргумента).")
return
with get_session() as s:
try:
p, owner, scope = _resolve_policy_and_owner(s, update)
except RuntimeError:
await update.effective_message.reply_text("Политика не привязана. В группе откройте /security → «Привязать к этому чату».")
return
q = (
s.query(SpamDictionary)
.join(PolicyDictionaryLink, PolicyDictionaryLink.dictionary_id == SpamDictionary.id)
.filter(PolicyDictionaryLink.policy_id == p.id, SpamDictionary.owner_user_id == owner.id)
)
if cat:
q = q.filter(SpamDictionary.category == cat)
ds = q.all()
if not ds:
await update.effective_message.reply_text("Нет привязанных словарей для отвязки.")
return
deleted = 0
for d in ds:
link = s.query(PolicyDictionaryLink).filter_by(policy_id=p.id, dictionary_id=d.id).first()
if link:
s.delete(link); deleted += 1
s.commit()
from app.moderation.engine import dict_cache
dict_cache.invalidate(p.id)
await update.effective_message.reply_text(f"Отвязал {deleted} словарей от {scope}." + (f" (категория: {cat})" if cat else ""))
async def dicts_linked_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not await _is_group_admin(update, ctx):
return
with get_session() as s:
try:
p, owner, scope = _resolve_policy_and_owner(s, update)
except RuntimeError:
await update.effective_message.reply_text("Политика не привязана. В группе откройте /security → «Привязать к этому чату».")
return
links = s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all()
if not links:
await update.effective_message.reply_text(f"К {scope} пока ничего не привязано.")
return
d_ids = [lnk.dictionary_id for lnk in links]
dicts = s.query(SpamDictionary).filter(SpamDictionary.id.in_(d_ids)).all()
counts = {
d.id: s.query(func.count(DictionaryEntry.id)).filter(DictionaryEntry.dictionary_id == d.id).scalar() or 0
for d in dicts
}
lines = [f"Привязано к {scope}:"]
for d in dicts:
lines.append(f"• [{d.id}] {d.name} ({d.category}/{d.kind}) — правил: {counts.get(d.id, 0)}")
await update.effective_message.reply_text("\n".join(lines))