346 lines
15 KiB
Python
346 lines
15 KiB
Python
import io
|
||
from datetime import datetime, timedelta
|
||
from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
|
||
from telegram.constants import ChatType
|
||
from telegram.ext import ContextTypes
|
||
from sqlalchemy import func
|
||
|
||
from app.db.session import get_session
|
||
from app.db.models import (
|
||
User, SecurityPolicy, ChatSecurity,
|
||
SpamDictionary, DictionaryEntry, PolicyDictionaryLink,
|
||
)
|
||
from app.bot.keyboards.security import kb_policy
|
||
from app.moderation.engine import dict_cache
|
||
|
||
# ---------- helpers ----------
|
||
def _get_or_create_policy(session, owner_tg_id: int) -> SecurityPolicy:
|
||
u = session.query(User).filter_by(tg_id=owner_tg_id).first()
|
||
if not u:
|
||
u = User(tg_id=owner_tg_id, name="")
|
||
session.add(u); session.commit(); session.refresh(u)
|
||
p = session.query(SecurityPolicy).filter_by(owner_user_id=u.id, name="Balanced").first()
|
||
if p:
|
||
return p
|
||
p = SecurityPolicy(owner_user_id=u.id, name="Balanced")
|
||
session.add(p); session.commit(); session.refresh(p)
|
||
return p
|
||
|
||
|
||
def _parse_params(raw: str | None, fallback_name: str) -> dict:
|
||
params = {"name": fallback_name or "dict", "category": "custom", "kind": "plain", "lang": None}
|
||
if raw:
|
||
for kv in raw.split(";"):
|
||
if "=" in kv:
|
||
k, v = kv.strip().split("=", 1)
|
||
params[k.strip()] = v.strip()
|
||
params["name"] = (params["name"] or "dict")[:120]
|
||
params["category"] = (params.get("category") or "custom").lower()
|
||
params["kind"] = (params.get("kind") or "plain").lower()
|
||
return params
|
||
|
||
|
||
def _decode_bytes(b: bytes) -> str:
|
||
for enc in ("utf-8", "cp1251", "latin-1"):
|
||
try:
|
||
return b.decode(enc)
|
||
except Exception:
|
||
continue
|
||
return b.decode("utf-8", "ignore")
|
||
|
||
|
||
def _import_entries(session, owner_tg_id: int, params: dict, entries: list[str]) -> int:
|
||
# ensure owner
|
||
u = session.query(User).filter_by(tg_id=owner_tg_id).first()
|
||
if not u:
|
||
u = User(tg_id=owner_tg_id, name="")
|
||
session.add(u)
|
||
session.commit()
|
||
session.refresh(u)
|
||
|
||
# создать словарь
|
||
d = SpamDictionary(
|
||
owner_user_id=u.id,
|
||
name=params["name"], category=params["category"],
|
||
kind=params["kind"], lang=params.get("lang"),
|
||
)
|
||
session.add(d); session.commit(); session.refresh(d)
|
||
|
||
# автопривязка к дефолт-политике владельца (Balanced), чтобы потом /dicts работал сразу
|
||
p = _get_or_create_policy(session, owner_tg_id)
|
||
if not session.query(PolicyDictionaryLink).filter_by(policy_id=p.id, dictionary_id=d.id).first():
|
||
session.add(PolicyDictionaryLink(policy_id=p.id, dictionary_id=d.id))
|
||
session.commit()
|
||
|
||
# добавить записи
|
||
n = 0
|
||
for pat in entries:
|
||
pat = pat.strip()
|
||
if not pat or pat.startswith("#"):
|
||
continue
|
||
session.add(DictionaryEntry(dictionary_id=d.id, pattern=pat, is_regex=(params["kind"] == "regex")))
|
||
n += 1
|
||
session.commit()
|
||
return n
|
||
|
||
|
||
# ---------- /security ----------
|
||
async def security_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
with get_session() as s:
|
||
p = _get_or_create_policy(s, update.effective_user.id)
|
||
chat = update.effective_chat
|
||
bound = enabled = False
|
||
if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
|
||
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
|
||
if cs and cs.policy_id == p.id:
|
||
bound, enabled = True, cs.enabled
|
||
|
||
# показать панель
|
||
msg = await update.effective_message.reply_text(
|
||
f"Политика «{p.name}».",
|
||
reply_markup=kb_policy(p, chat_bound=bound, enabled=enabled)
|
||
)
|
||
|
||
# ----- ЗАМОК панели: только инициатор может нажимать -----
|
||
locks = ctx.chat_data.setdefault("security_locks", {})
|
||
locks[msg.message_id] = update.effective_user.id
|
||
# TTL/очистку можно сделать по таймеру, если нужно
|
||
|
||
|
||
async def security_cb(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
q = update.callback_query
|
||
await q.answer()
|
||
|
||
# замок на сообщение
|
||
lock_owner = ctx.chat_data.get("security_locks", {}).get(q.message.message_id)
|
||
if lock_owner and lock_owner != update.effective_user.id:
|
||
await q.answer("Эта панель открыта другим админом. Запустите /security для своей.", show_alert=True)
|
||
return
|
||
|
||
# должен быть админом чата
|
||
try:
|
||
m = await ctx.bot.get_chat_member(update.effective_chat.id, update.effective_user.id)
|
||
if m.status not in ("administrator", "creator"):
|
||
await q.answer("Недостаточно прав.", show_alert=True); return
|
||
except Exception:
|
||
await q.answer("Недостаточно прав.", show_alert=True); return
|
||
|
||
parts = (q.data or "").split(":")
|
||
if len(parts) < 2 or parts[0] != "pol":
|
||
return
|
||
action = parts[1]
|
||
|
||
with get_session() as s:
|
||
pid = int(parts[-1])
|
||
p = s.get(SecurityPolicy, pid)
|
||
if not p:
|
||
await q.edit_message_text("Политика не найдена.")
|
||
return
|
||
|
||
if action == "toggle":
|
||
field = parts[2]
|
||
setattr(p, field, not getattr(p, field))
|
||
s.commit()
|
||
dict_cache.invalidate(cs.policy_id)
|
||
|
||
elif action == "adj":
|
||
field, delta = parts[2], int(parts[3])
|
||
val = getattr(p, field)
|
||
setattr(p, field, max(0, val + delta))
|
||
s.commit()
|
||
|
||
elif action == "cycle_action":
|
||
order = ["delete", "warn", "timeout", "ban", "none"]
|
||
cur = p.enforce_action_default
|
||
p.enforce_action_default = order[(order.index(cur) + 1) % len(order)] if cur in order else "delete"
|
||
s.commit()
|
||
|
||
elif action == "bind_here":
|
||
chat = update.effective_chat
|
||
if chat.type not in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
|
||
await q.edit_message_text("Жмите в группе/канале.")
|
||
return
|
||
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
|
||
if not cs:
|
||
cs = ChatSecurity(chat_id=chat.id, policy_id=p.id, enabled=False)
|
||
s.add(cs)
|
||
else:
|
||
cs.policy_id = p.id
|
||
s.commit()
|
||
|
||
elif action == "toggle_chat":
|
||
chat = update.effective_chat
|
||
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id, policy_id=pid).first()
|
||
if cs:
|
||
cs.enabled = not cs.enabled
|
||
s.commit()
|
||
|
||
# перерисовать клавиатуру с учётом чата
|
||
chat = update.effective_chat
|
||
bound = enabled = False
|
||
if chat and chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
|
||
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
|
||
if cs and cs.policy_id == p.id:
|
||
bound, enabled = True, cs.enabled
|
||
|
||
await q.edit_message_reply_markup(reply_markup=kb_policy(p, chat_bound=bound, enabled=enabled))
|
||
|
||
|
||
# ---------- словари: список и тоггл ----------
|
||
def _kb_dicts(policy_id: int, rows: list[tuple[int, str, bool]]):
|
||
# rows: [(dict_id, "Имя (категория/kind)", is_linked)]
|
||
kbd = []
|
||
for did, title, linked in rows:
|
||
mark = "✅" if linked else "▫️"
|
||
kbd.append([InlineKeyboardButton(f"{mark} {title}", callback_data=f"dict:toggle:{policy_id}:{did}")])
|
||
return InlineKeyboardMarkup(kbd) if kbd else None
|
||
|
||
|
||
async def dicts_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
chat = update.effective_chat
|
||
user_id = update.effective_user.id
|
||
|
||
# админ-проверка в группах
|
||
if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP):
|
||
try:
|
||
m = await ctx.bot.get_chat_member(chat.id, user_id)
|
||
if m.status not in ("administrator", "creator"):
|
||
return
|
||
except Exception:
|
||
return
|
||
|
||
with get_session() as s:
|
||
# выбрать политику: в группе — политика чата; в ЛС — дефолт владельца
|
||
if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
|
||
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
|
||
if not cs:
|
||
await update.effective_message.reply_text("Политика не привязана. Откройте /security и привяжите к чату.")
|
||
return
|
||
p = s.get(SecurityPolicy, cs.policy_id)
|
||
else:
|
||
p = _get_or_create_policy(s, user_id)
|
||
|
||
u = s.query(User).filter_by(tg_id=user_id).first()
|
||
dicts = s.query(SpamDictionary).filter_by(owner_user_id=u.id).order_by(SpamDictionary.created_at.desc()).all()
|
||
linked = {x.dictionary_id for x in s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all()}
|
||
|
||
rows = []
|
||
for d in dicts[:100]:
|
||
title = f"{d.name} ({d.category}/{d.kind})"
|
||
rows.append((d.id, title, d.id in linked))
|
||
|
||
kb = _kb_dicts(p.id, rows)
|
||
if not rows:
|
||
await update.effective_message.reply_text("У вас пока нет словарей. Импортируйте через /spam_import в ЛС.")
|
||
return
|
||
await update.effective_message.reply_text(
|
||
f"Словари для политики «{p.name}» (нажмите для прикрепления/открепления):",
|
||
reply_markup=kb
|
||
)
|
||
|
||
|
||
async def dicts_cb(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
q = update.callback_query
|
||
await q.answer()
|
||
parts = (q.data or "").split(":")
|
||
if len(parts) != 4 or parts[0] != "dict" or parts[1] != "toggle":
|
||
return
|
||
policy_id = int(parts[2]); dict_id = int(parts[3])
|
||
|
||
# только админ
|
||
try:
|
||
m = await ctx.bot.get_chat_member(update.effective_chat.id, update.effective_user.id)
|
||
if m.status not in ("administrator", "creator"):
|
||
await q.answer("Недостаточно прав.", show_alert=True); return
|
||
except Exception:
|
||
await q.answer("Недостаточно прав.", show_alert=True); return
|
||
|
||
with get_session() as s:
|
||
p = s.get(SecurityPolicy, policy_id)
|
||
if not p:
|
||
await q.edit_message_text("Политика не найдена.")
|
||
return
|
||
link = s.query(PolicyDictionaryLink).filter_by(policy_id=policy_id, dictionary_id=dict_id).first()
|
||
if link:
|
||
s.delete(link); s.commit(); dict_cache.invalidate(policy_id)
|
||
else:
|
||
s.add(PolicyDictionaryLink(policy_id=policy_id, dictionary_id=dict_id)); s.commit(); dict_cache.invalidate(policy_id)
|
||
|
||
u = s.query(User).filter_by(tg_id=update.effective_user.id).first()
|
||
dicts = s.query(SpamDictionary).filter_by(owner_user_id=u.id).order_by(SpamDictionary.created_at.desc()).all()
|
||
linked = {x.dictionary_id for x in s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all()}
|
||
rows = []
|
||
for d in dicts[:100]:
|
||
title = f"{d.name} ({d.category}/{d.kind})"
|
||
rows.append((d.id, title, d.id in linked))
|
||
|
||
await q.edit_message_reply_markup(reply_markup=_kb_dicts(p.id, rows))
|
||
|
||
|
||
# ---------- импорт словаря ----------
|
||
async def spam_import_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
# только в ЛС; в main.py уже стоит фильтр, но на всякий
|
||
if update.effective_chat.type != ChatType.PRIVATE:
|
||
await update.effective_message.reply_text("Эту команду нужно выполнить в личке со мной.")
|
||
return
|
||
ctx.user_data["await_dict_file"] = True
|
||
ctx.user_data.pop("dict_params", None)
|
||
await update.effective_message.reply_text(
|
||
"Пришлите .txt/.csv ФАЙЛОМ — один паттерн на строку.\n"
|
||
"Подпись (необязательно): name=RU_spam; category=spam|scam|adult|profanity|custom; kind=plain|regex; lang=ru"
|
||
)
|
||
|
||
|
||
async def spam_import_capture(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
if update.effective_chat.type != ChatType.PRIVATE or not ctx.user_data.get("await_dict_file"):
|
||
return
|
||
doc = update.message.document if update.message else None
|
||
if not doc:
|
||
return
|
||
await update.effective_message.reply_text(f"Файл получен: {doc.file_name or 'без имени'} — импортирую…")
|
||
try:
|
||
f = await doc.get_file()
|
||
bio = io.BytesIO()
|
||
await f.download_to_memory(out=bio)
|
||
bio.seek(0)
|
||
text = _decode_bytes(bio.read())
|
||
lines = [l.strip() for l in text.splitlines() if l.strip()]
|
||
if not lines:
|
||
await update.effective_message.reply_text("Файл пуст. Добавьте строки с паттернами.")
|
||
return
|
||
params = _parse_params(update.message.caption, doc.file_name or "dict")
|
||
with get_session() as s:
|
||
n = _import_entries(s, update.effective_user.id, params, lines)
|
||
ctx.user_data.pop("await_dict_file", None)
|
||
await update.effective_message.reply_text(f"Импортировано {n} записей в словарь «{params['name']}».")
|
||
except Exception as e:
|
||
await update.effective_message.reply_text(f"Ошибка импорта: {e}")
|
||
|
||
|
||
async def spam_import_text_capture(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
# перехватываем только в ЛС и только если ждём словарь — И НЕ блокируем цепочку (block=False в main.py)
|
||
if update.effective_chat.type != ChatType.PRIVATE or not ctx.user_data.get("await_dict_file"):
|
||
return
|
||
txt = (update.effective_message.text or "").strip()
|
||
if not txt:
|
||
return
|
||
# если текст похож на "параметры", просто запомним и попросим файл
|
||
if ("=" in txt) and (";" in txt) and (len(txt.split()) <= 6):
|
||
ctx.user_data["dict_params"] = txt
|
||
await update.effective_message.reply_text("Параметры принял. Теперь пришлите .txt/.csv ФАЙЛОМ со словарём.")
|
||
return
|
||
|
||
# иначе считаем, что прислан словарь "в лоб"
|
||
lines = [l.strip() for l in txt.splitlines() if l.strip()]
|
||
params = _parse_params(ctx.user_data.get("dict_params"), "inline_dict")
|
||
try:
|
||
with get_session() as s:
|
||
n = _import_entries(s, update.effective_user.id, params, lines)
|
||
ctx.user_data.pop("await_dict_file", None)
|
||
ctx.user_data.pop("dict_params", None)
|
||
await update.effective_message.reply_text(
|
||
f"Импортировано {n} записей (из текста) в словарь «{params['name']}»."
|
||
)
|
||
except Exception as e:
|
||
await update.effective_message.reply_text(f"Ошибка импорта: {e}")
|