Files
tg_post_min/app/bot/handlers/security.py

343 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import io
from datetime import datetime, timedelta
from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.constants import ChatType
from telegram.ext import ContextTypes
from sqlalchemy import func
from app.db.session import get_session
from app.db.models import (
User, SecurityPolicy, ChatSecurity,
SpamDictionary, DictionaryEntry, PolicyDictionaryLink,
)
from app.bot.keyboards.security import kb_policy
# ---------- helpers ----------
def _get_or_create_policy(session, owner_tg_id: int) -> SecurityPolicy:
u = session.query(User).filter_by(tg_id=owner_tg_id).first()
if not u:
u = User(tg_id=owner_tg_id, name="")
session.add(u); session.commit(); session.refresh(u)
p = session.query(SecurityPolicy).filter_by(owner_user_id=u.id, name="Balanced").first()
if p:
return p
p = SecurityPolicy(owner_user_id=u.id, name="Balanced")
session.add(p); session.commit(); session.refresh(p)
return p
def _parse_params(raw: str | None, fallback_name: str) -> dict:
params = {"name": fallback_name or "dict", "category": "custom", "kind": "plain", "lang": None}
if raw:
for kv in raw.split(";"):
if "=" in kv:
k, v = kv.strip().split("=", 1)
params[k.strip()] = v.strip()
params["name"] = (params["name"] or "dict")[:120]
params["category"] = (params.get("category") or "custom").lower()
params["kind"] = (params.get("kind") or "plain").lower()
return params
def _decode_bytes(b: bytes) -> str:
for enc in ("utf-8", "cp1251", "latin-1"):
try:
return b.decode(enc)
except Exception:
continue
return b.decode("utf-8", "ignore")
def _import_entries(session, owner_tg_id: int, params: dict, entries: list[str]) -> int:
# ensure owner
u = session.query(User).filter_by(tg_id=owner_tg_id).first()
if not u:
u = User(tg_id=owner_tg_id, name="")
session.add(u); session.commit(); session.refresh(u)
# создать словарь
d = SpamDictionary(
owner_user_id=u.id,
name=params["name"], category=params["category"],
kind=params["kind"], lang=params.get("lang"),
)
session.add(d); session.commit(); session.refresh(d)
# автопривязка к дефолт-политике владельца (Balanced), чтобы потом /dicts работал сразу
p = _get_or_create_policy(session, owner_tg_id)
if not session.query(PolicyDictionaryLink).filter_by(policy_id=p.id, dictionary_id=d.id).first():
session.add(PolicyDictionaryLink(policy_id=p.id, dictionary_id=d.id))
session.commit()
# добавить записи
n = 0
for pat in entries:
pat = pat.strip()
if not pat or pat.startswith("#"):
continue
session.add(DictionaryEntry(dictionary_id=d.id, pattern=pat, is_regex=(params["kind"] == "regex")))
n += 1
session.commit()
return n
# ---------- /security ----------
async def security_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
with get_session() as s:
p = _get_or_create_policy(s, update.effective_user.id)
chat = update.effective_chat
bound = enabled = False
if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
if cs and cs.policy_id == p.id:
bound, enabled = True, cs.enabled
# показать панель
msg = await update.effective_message.reply_text(
f"Политика «{p.name}».",
reply_markup=kb_policy(p, chat_bound=bound, enabled=enabled)
)
# ----- ЗАМОК панели: только инициатор может нажимать -----
locks = ctx.chat_data.setdefault("security_locks", {})
locks[msg.message_id] = update.effective_user.id
# TTL/очистку можно сделать по таймеру, если нужно
async def security_cb(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
# замок на сообщение
lock_owner = ctx.chat_data.get("security_locks", {}).get(q.message.message_id)
if lock_owner and lock_owner != update.effective_user.id:
await q.answer("Эта панель открыта другим админом. Запустите /security для своей.", show_alert=True)
return
# должен быть админом чата
try:
m = await ctx.bot.get_chat_member(update.effective_chat.id, update.effective_user.id)
if m.status not in ("administrator", "creator"):
await q.answer("Недостаточно прав.", show_alert=True); return
except Exception:
await q.answer("Недостаточно прав.", show_alert=True); return
parts = (q.data or "").split(":")
if len(parts) < 2 or parts[0] != "pol":
return
action = parts[1]
with get_session() as s:
pid = int(parts[-1])
p = s.get(SecurityPolicy, pid)
if not p:
await q.edit_message_text("Политика не найдена.")
return
if action == "toggle":
field = parts[2]
setattr(p, field, not getattr(p, field))
s.commit()
elif action == "adj":
field, delta = parts[2], int(parts[3])
val = getattr(p, field)
setattr(p, field, max(0, val + delta))
s.commit()
elif action == "cycle_action":
order = ["delete", "warn", "timeout", "ban", "none"]
cur = p.enforce_action_default
p.enforce_action_default = order[(order.index(cur) + 1) % len(order)] if cur in order else "delete"
s.commit()
elif action == "bind_here":
chat = update.effective_chat
if chat.type not in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
await q.edit_message_text("Жмите в группе/канале.")
return
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
if not cs:
cs = ChatSecurity(chat_id=chat.id, policy_id=p.id, enabled=False)
s.add(cs)
else:
cs.policy_id = p.id
s.commit()
elif action == "toggle_chat":
chat = update.effective_chat
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id, policy_id=pid).first()
if cs:
cs.enabled = not cs.enabled
s.commit()
# перерисовать клавиатуру с учётом чата
chat = update.effective_chat
bound = enabled = False
if chat and chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
if cs and cs.policy_id == p.id:
bound, enabled = True, cs.enabled
await q.edit_message_reply_markup(reply_markup=kb_policy(p, chat_bound=bound, enabled=enabled))
# ---------- словари: список и тоггл ----------
def _kb_dicts(policy_id: int, rows: list[tuple[int, str, bool]]):
# rows: [(dict_id, "Имя (категория/kind)", is_linked)]
kbd = []
for did, title, linked in rows:
mark = "" if linked else "▫️"
kbd.append([InlineKeyboardButton(f"{mark} {title}", callback_data=f"dict:toggle:{policy_id}:{did}")])
return InlineKeyboardMarkup(kbd) if kbd else None
async def dicts_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
chat = update.effective_chat
user_id = update.effective_user.id
# админ-проверка в группах
if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP):
try:
m = await ctx.bot.get_chat_member(chat.id, user_id)
if m.status not in ("administrator", "creator"):
return
except Exception:
return
with get_session() as s:
# выбрать политику: в группе — политика чата; в ЛС — дефолт владельца
if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
if not cs:
await update.effective_message.reply_text("Политика не привязана. Откройте /security и привяжите к чату.")
return
p = s.get(SecurityPolicy, cs.policy_id)
else:
p = _get_or_create_policy(s, user_id)
u = s.query(User).filter_by(tg_id=user_id).first()
dicts = s.query(SpamDictionary).filter_by(owner_user_id=u.id).order_by(SpamDictionary.created_at.desc()).all()
linked = {x.dictionary_id for x in s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all()}
rows = []
for d in dicts[:100]:
title = f"{d.name} ({d.category}/{d.kind})"
rows.append((d.id, title, d.id in linked))
kb = _kb_dicts(p.id, rows)
if not rows:
await update.effective_message.reply_text("У вас пока нет словарей. Импортируйте через /spam_import в ЛС.")
return
await update.effective_message.reply_text(
f"Словари для политики «{p.name}» (нажмите для прикрепления/открепления):",
reply_markup=kb
)
async def dicts_cb(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
q = update.callback_query
await q.answer()
parts = (q.data or "").split(":")
if len(parts) != 4 or parts[0] != "dict" or parts[1] != "toggle":
return
policy_id = int(parts[2]); dict_id = int(parts[3])
# только админ
try:
m = await ctx.bot.get_chat_member(update.effective_chat.id, update.effective_user.id)
if m.status not in ("administrator", "creator"):
await q.answer("Недостаточно прав.", show_alert=True); return
except Exception:
await q.answer("Недостаточно прав.", show_alert=True); return
with get_session() as s:
p = s.get(SecurityPolicy, policy_id)
if not p:
await q.edit_message_text("Политика не найдена.")
return
link = s.query(PolicyDictionaryLink).filter_by(policy_id=policy_id, dictionary_id=dict_id).first()
if link:
s.delete(link); s.commit()
else:
s.add(PolicyDictionaryLink(policy_id=policy_id, dictionary_id=dict_id)); s.commit()
u = s.query(User).filter_by(tg_id=update.effective_user.id).first()
dicts = s.query(SpamDictionary).filter_by(owner_user_id=u.id).order_by(SpamDictionary.created_at.desc()).all()
linked = {x.dictionary_id for x in s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all()}
rows = []
for d in dicts[:100]:
title = f"{d.name} ({d.category}/{d.kind})"
rows.append((d.id, title, d.id in linked))
await q.edit_message_reply_markup(reply_markup=_kb_dicts(p.id, rows))
# ---------- импорт словаря ----------
async def spam_import_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
# только в ЛС; в main.py уже стоит фильтр, но на всякий
if update.effective_chat.type != ChatType.PRIVATE:
await update.effective_message.reply_text("Эту команду нужно выполнить в личке со мной.")
return
ctx.user_data["await_dict_file"] = True
ctx.user_data.pop("dict_params", None)
await update.effective_message.reply_text(
"Пришлите .txt/.csv ФАЙЛОМ — один паттерн на строку.\n"
"Подпись (необязательно): name=RU_spam; category=spam|scam|adult|profanity|custom; kind=plain|regex; lang=ru"
)
async def spam_import_capture(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if update.effective_chat.type != ChatType.PRIVATE or not ctx.user_data.get("await_dict_file"):
return
doc = update.message.document if update.message else None
if not doc:
return
await update.effective_message.reply_text(f"Файл получен: {doc.file_name or 'без имени'} — импортирую…")
try:
f = await doc.get_file()
bio = io.BytesIO()
await f.download_to_memory(out=bio)
bio.seek(0)
text = _decode_bytes(bio.read())
lines = [l.strip() for l in text.splitlines() if l.strip()]
if not lines:
await update.effective_message.reply_text("Файл пуст. Добавьте строки с паттернами.")
return
params = _parse_params(update.message.caption, doc.file_name or "dict")
with get_session() as s:
n = _import_entries(s, update.effective_user.id, params, lines)
ctx.user_data.pop("await_dict_file", None)
await update.effective_message.reply_text(f"Импортировано {n} записей в словарь «{params['name']}».")
except Exception as e:
await update.effective_message.reply_text(f"Ошибка импорта: {e}")
async def spam_import_text_capture(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
# перехватываем только в ЛС и только если ждём словарь — И НЕ блокируем цепочку (block=False в main.py)
if update.effective_chat.type != ChatType.PRIVATE or not ctx.user_data.get("await_dict_file"):
return
txt = (update.effective_message.text or "").strip()
if not txt:
return
# если текст похож на "параметры", просто запомним и попросим файл
if ("=" in txt) and (";" in txt) and (len(txt.split()) <= 6):
ctx.user_data["dict_params"] = txt
await update.effective_message.reply_text("Параметры принял. Теперь пришлите .txt/.csv ФАЙЛОМ со словарём.")
return
# иначе считаем, что прислан словарь "в лоб"
lines = [l.strip() for l in txt.splitlines() if l.strip()]
params = _parse_params(ctx.user_data.get("dict_params"), "inline_dict")
try:
with get_session() as s:
n = _import_entries(s, update.effective_user.id, params, lines)
ctx.user_data.pop("await_dict_file", None)
ctx.user_data.pop("dict_params", None)
await update.effective_message.reply_text(
f"Импортировано {n} записей (из текста) в словарь «{params['name']}»."
)
except Exception as e:
await update.effective_message.reply_text(f"Ошибка импорта: {e}")