Files
tg_post_min/app/bot/handlers/security.py
Andrey K. Choi c16ec54891 Bot become a Community Guard & Post send manager
added: dictionary support for censore
message/user management with dict triggers
2025-08-22 21:44:14 +09:00

247 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import io
from telegram import Update
from telegram.constants import ChatType
from telegram.ext import ContextTypes
from sqlalchemy import select, func
from app.db.session import get_session
from app.db.models import (
User, SecurityPolicy, ChatSecurity,
SpamDictionary, DictionaryEntry, PolicyDictionaryLink, # NEW: PolicyDictionaryLink
)
from app.bot.keyboards.security import kb_policy
from telegram import InlineKeyboardMarkup, InlineKeyboardButton
def _get_or_create_policy(session, owner_tg_id: int) -> SecurityPolicy:
u = session.query(User).filter_by(tg_id=owner_tg_id).first()
if not u:
u = User(tg_id=owner_tg_id, name=""); session.add(u); session.commit(); session.refresh(u)
p = session.query(SecurityPolicy).filter_by(owner_user_id=u.id, name="Balanced").first()
if p: return p
p = SecurityPolicy(owner_user_id=u.id, name="Balanced")
session.add(p); session.commit(); session.refresh(p)
return p
def _parse_params(raw: str|None, fallback_name: str) -> dict:
params = {"name": fallback_name or "dict", "category":"custom", "kind":"plain", "lang":None}
if raw:
for kv in raw.split(";"):
if "=" in kv:
k,v = kv.strip().split("=",1)
params[k.strip()] = v.strip()
params["name"] = (params["name"] or "dict")[:120]
params["category"] = (params.get("category") or "custom").lower()
params["kind"] = (params.get("kind") or "plain").lower()
return params
def _decode_bytes(b: bytes) -> str:
for enc in ("utf-8","cp1251","latin-1"):
try: return b.decode(enc)
except Exception: pass
return b.decode("utf-8","ignore")
def _import_entries(session, owner_tg_id: int, params: dict, entries: list[str]) -> int:
# ensure owner
u = session.query(User).filter_by(tg_id=owner_tg_id).first()
if not u:
u = User(tg_id=owner_tg_id, name=""); session.add(u); session.commit(); session.refresh(u)
# словарь
d = SpamDictionary(
owner_user_id=u.id,
name=params["name"], category=params["category"],
kind=params["kind"], lang=params.get("lang"),
)
session.add(d); session.commit(); session.refresh(d)
# NEW: авто-привязка к дефолт-политике владельца (Balanced)
p = _get_or_create_policy(session, owner_tg_id)
exists = session.query(PolicyDictionaryLink).filter_by(policy_id=p.id, dictionary_id=d.id).first()
if not exists:
session.add(PolicyDictionaryLink(policy_id=p.id, dictionary_id=d.id))
session.commit()
# записи
n = 0
for pat in entries:
pat = pat.strip()
if not pat or pat.startswith("#"): continue
session.add(DictionaryEntry(dictionary_id=d.id, pattern=pat, is_regex=(params["kind"]=="regex")))
n += 1
session.commit()
return n
async def security_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
with get_session() as s:
p = _get_or_create_policy(s, update.effective_user.id)
chat = update.effective_chat
bound=enabled=False
if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
if cs and cs.policy_id == p.id: bound, enabled = True, cs.enabled
await update.effective_message.reply_text(f"Политика «{p.name}»", reply_markup=kb_policy(p, chat_bound=bound, enabled=enabled))
async def security_cb(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
q = update.callback_query; await q.answer()
parts = q.data.split(":")
if parts[0] != "pol": return
action = parts[1]
with get_session() as s:
pid = int(parts[-1]); p = s.get(SecurityPolicy, pid)
if not p: await q.edit_message_text("Политика не найдена."); return
if action == "toggle":
field = parts[2]; setattr(p, field, not getattr(p, field)); s.commit()
elif action == "adj":
field, delta = parts[2], int(parts[3]); val = getattr(p, field); setattr(p, field, max(0, val+delta)); s.commit()
elif action == "cycle_action":
order = ["delete","warn","timeout","ban","none"]; cur=p.enforce_action_default
p.enforce_action_default = order[(order.index(cur)+1)%len(order)] if cur in order else "delete"; s.commit()
elif action == "bind_here":
chat = update.effective_chat
if chat.type not in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
await q.edit_message_text("Жмите в группе/канале."); return
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
if not cs: cs = ChatSecurity(chat_id=chat.id, policy_id=p.id, enabled=False); s.add(cs)
else: cs.policy_id = p.id
s.commit()
elif action == "toggle_chat":
chat = update.effective_chat; cs = s.query(ChatSecurity).filter_by(chat_id=chat.id, policy_id=pid).first()
if cs: cs.enabled = not cs.enabled; s.commit()
# обновить клавиатуру
chat = update.effective_chat; bound=enabled=False
if chat and chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
if cs and cs.policy_id == p.id: bound, enabled = True, cs.enabled
await q.edit_message_reply_markup(reply_markup=kb_policy(p, chat_bound=bound, enabled=enabled))
# === Импорт словаря ===
async def spam_import_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
log.info("spam_import_cmd from %s", update.effective_user.id)
ctx.user_data["await_dict_file"] = True
ctx.user_data.pop("dict_params", None)
await update.effective_message.reply_text(
"Пришлите .txt/.csv ФАЙЛОМ — один паттерн на строку.\n"
"Подпись (необязательно): name=RU_spam; category=spam|scam|adult|profanity|custom; kind=plain|regex; lang=ru"
)
async def spam_import_capture(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
log.info("spam_import_capture: doc=%s caption=%s",
bool(update.message and update.message.document),
update.message.caption if update.message else None)
# Обрабатываем только когда ждём файл
if not ctx.user_data.get("await_dict_file"):
return
doc = update.message.document if update.message else None
if not doc:
return
# ACK сразу, чтобы было видно, что бот работает
await update.effective_message.reply_text(f"Файл получен: {doc.file_name or 'без имени'} — импортирую…")
try:
file = await doc.get_file()
bio = io.BytesIO()
await file.download_to_memory(out=bio)
bio.seek(0)
text = _decode_bytes(bio.read())
lines = [l.strip() for l in text.splitlines() if l.strip()]
if not lines:
await update.effective_message.reply_text("Файл пуст. Добавьте строки с паттернами.")
return
params = _parse_params(update.message.caption, doc.file_name or "dict")
with get_session() as s:
n = _import_entries(s, update.effective_user.id, params, lines)
ctx.user_data.pop("await_dict_file", None)
await update.effective_message.reply_text(f"Импортировано {n} записей в словарь «{params['name']}».")
except Exception as e:
await update.effective_message.reply_text(f"Ошибка импорта: {e}")
# === (опционально) Импорт словаря из текста, если прислали без файла ===
async def spam_import_text_capture(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
log.info("spam_import_text_capture: await=%s text_len=%s",
ctx.user_data.get("await_dict_file"),
len(update.effective_message.text or ""))
if not ctx.user_data.get("await_dict_file"):
return
txt = (update.effective_message.text or "").strip()
if not txt:
return
# Если похоже на «подпись» с параметрами — просто запомним и попросим файл
if ("=" in txt) and (";" in txt) and (len(txt.split()) <= 6):
ctx.user_data["dict_params"] = txt
await update.effective_message.reply_text("Параметры принял. Теперь пришлите .txt/.csv ФАЙЛОМ со словарём.")
return
# Иначе трактуем как словарь одной «пачкой»
lines = [l.strip() for l in txt.splitlines() if l.strip()]
params = _parse_params(ctx.user_data.get("dict_params"), "inline_dict")
try:
with get_session() as s:
n = _import_entries(s, update.effective_user.id, params, lines)
ctx.user_data.pop("await_dict_file", None)
ctx.user_data.pop("dict_params", None)
await update.effective_message.reply_text(f"Импортировано {n} записей (из текста) в словарь «{params['name']}».")
except Exception as e:
await update.effective_message.reply_text(f"Ошибка импорта: {e}")
def _kb_dicts(policy_id: int, rows: list[tuple[int,str,bool]]):
# rows: [(dict_id, "Имя (категория/kind)", is_linked)]
kbd = []
for did, title, linked in rows:
mark = "" if linked else "▫️"
kbd.append([InlineKeyboardButton(f"{mark} {title}", callback_data=f"dict:toggle:{policy_id}:{did}")])
return InlineKeyboardMarkup(kbd) if kbd else None
async def dicts_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
chat = update.effective_chat
with get_session() as s:
# Выбираем политику: в группе — привязанную к чату, иначе — дефолт владельца
if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL):
cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first()
if not cs:
await update.effective_message.reply_text("Политика не привязана. Откройте /security и привяжите к чату.")
return
p = s.get(SecurityPolicy, cs.policy_id)
else:
p = _get_or_create_policy(s, update.effective_user.id)
# словари владельца
u = s.query(User).filter_by(tg_id=update.effective_user.id).first()
dicts = s.query(SpamDictionary).filter_by(owner_user_id=u.id).order_by(SpamDictionary.created_at.desc()).all()
linked = {x.dictionary_id for x in s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all()}
rows = []
for d in dicts[:50]: # первые 50
title = f"{d.name} ({d.category}/{d.kind})"
rows.append((d.id, title, d.id in linked))
kb = _kb_dicts(p.id, rows)
if not rows:
await update.effective_message.reply_text("У вас пока нет словарей. Импортируйте через /spam_import.")
return
await update.effective_message.reply_text(f"Словари для политики «{p.name}» (нажмите, чтобы прикрепить/открепить):", reply_markup=kb)
async def dicts_cb(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
q = update.callback_query; await q.answer()
data = q.data.split(":")
if len(data) != 4 or data[0] != "dict" or data[1] != "toggle":
return
policy_id = int(data[2]); dict_id = int(data[3])
with get_session() as s:
p = s.get(SecurityPolicy, policy_id)
if not p:
await q.edit_message_text("Политика не найдена.")
return
link = s.query(PolicyDictionaryLink).filter_by(policy_id=policy_id, dictionary_id=dict_id).first()
if link:
s.delete(link); s.commit()
else:
s.add(PolicyDictionaryLink(policy_id=policy_id, dictionary_id=dict_id)); s.commit()
# перерисовать список
u = s.query(User).filter_by(tg_id=update.effective_user.id).first()
dicts = s.query(SpamDictionary).filter_by(owner_user_id=u.id).order_by(SpamDictionary.created_at.desc()).all()
linked = {x.dictionary_id for x in s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all()}
rows = []
for d in dicts[:50]:
title = f"{d.name} ({d.category}/{d.kind})"
rows.append((d.id, title, d.id in linked))
await q.edit_message_reply_markup(reply_markup=_kb_dicts(p.id, rows))