import io from datetime import datetime, timedelta from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton from telegram.constants import ChatType from telegram.ext import ContextTypes from sqlalchemy import func from app.db.session import get_session from app.db.models import ( User, SecurityPolicy, ChatSecurity, SpamDictionary, DictionaryEntry, PolicyDictionaryLink, ) from app.bot.keyboards.security import kb_policy # ---------- helpers ---------- def _get_or_create_policy(session, owner_tg_id: int) -> SecurityPolicy: u = session.query(User).filter_by(tg_id=owner_tg_id).first() if not u: u = User(tg_id=owner_tg_id, name="") session.add(u); session.commit(); session.refresh(u) p = session.query(SecurityPolicy).filter_by(owner_user_id=u.id, name="Balanced").first() if p: return p p = SecurityPolicy(owner_user_id=u.id, name="Balanced") session.add(p); session.commit(); session.refresh(p) return p def _parse_params(raw: str | None, fallback_name: str) -> dict: params = {"name": fallback_name or "dict", "category": "custom", "kind": "plain", "lang": None} if raw: for kv in raw.split(";"): if "=" in kv: k, v = kv.strip().split("=", 1) params[k.strip()] = v.strip() params["name"] = (params["name"] or "dict")[:120] params["category"] = (params.get("category") or "custom").lower() params["kind"] = (params.get("kind") or "plain").lower() return params def _decode_bytes(b: bytes) -> str: for enc in ("utf-8", "cp1251", "latin-1"): try: return b.decode(enc) except Exception: continue return b.decode("utf-8", "ignore") def _import_entries(session, owner_tg_id: int, params: dict, entries: list[str]) -> int: # ensure owner u = session.query(User).filter_by(tg_id=owner_tg_id).first() if not u: u = User(tg_id=owner_tg_id, name="") session.add(u); session.commit(); session.refresh(u) # создать словарь d = SpamDictionary( owner_user_id=u.id, name=params["name"], category=params["category"], kind=params["kind"], lang=params.get("lang"), ) session.add(d); session.commit(); session.refresh(d) # автопривязка к дефолт-политике владельца (Balanced), чтобы потом /dicts работал сразу p = _get_or_create_policy(session, owner_tg_id) if not session.query(PolicyDictionaryLink).filter_by(policy_id=p.id, dictionary_id=d.id).first(): session.add(PolicyDictionaryLink(policy_id=p.id, dictionary_id=d.id)) session.commit() # добавить записи n = 0 for pat in entries: pat = pat.strip() if not pat or pat.startswith("#"): continue session.add(DictionaryEntry(dictionary_id=d.id, pattern=pat, is_regex=(params["kind"] == "regex"))) n += 1 session.commit() return n # ---------- /security ---------- async def security_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE): with get_session() as s: p = _get_or_create_policy(s, update.effective_user.id) chat = update.effective_chat bound = enabled = False if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL): cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first() if cs and cs.policy_id == p.id: bound, enabled = True, cs.enabled # показать панель msg = await update.effective_message.reply_text( f"Политика «{p.name}».", reply_markup=kb_policy(p, chat_bound=bound, enabled=enabled) ) # ----- ЗАМОК панели: только инициатор может нажимать ----- locks = ctx.chat_data.setdefault("security_locks", {}) locks[msg.message_id] = update.effective_user.id # TTL/очистку можно сделать по таймеру, если нужно async def security_cb(update: Update, ctx: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() # замок на сообщение lock_owner = ctx.chat_data.get("security_locks", {}).get(q.message.message_id) if lock_owner and lock_owner != update.effective_user.id: await q.answer("Эта панель открыта другим админом. Запустите /security для своей.", show_alert=True) return # должен быть админом чата try: m = await ctx.bot.get_chat_member(update.effective_chat.id, update.effective_user.id) if m.status not in ("administrator", "creator"): await q.answer("Недостаточно прав.", show_alert=True); return except Exception: await q.answer("Недостаточно прав.", show_alert=True); return parts = (q.data or "").split(":") if len(parts) < 2 or parts[0] != "pol": return action = parts[1] with get_session() as s: pid = int(parts[-1]) p = s.get(SecurityPolicy, pid) if not p: await q.edit_message_text("Политика не найдена.") return if action == "toggle": field = parts[2] setattr(p, field, not getattr(p, field)) s.commit() elif action == "adj": field, delta = parts[2], int(parts[3]) val = getattr(p, field) setattr(p, field, max(0, val + delta)) s.commit() elif action == "cycle_action": order = ["delete", "warn", "timeout", "ban", "none"] cur = p.enforce_action_default p.enforce_action_default = order[(order.index(cur) + 1) % len(order)] if cur in order else "delete" s.commit() elif action == "bind_here": chat = update.effective_chat if chat.type not in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL): await q.edit_message_text("Жмите в группе/канале.") return cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first() if not cs: cs = ChatSecurity(chat_id=chat.id, policy_id=p.id, enabled=False) s.add(cs) else: cs.policy_id = p.id s.commit() elif action == "toggle_chat": chat = update.effective_chat cs = s.query(ChatSecurity).filter_by(chat_id=chat.id, policy_id=pid).first() if cs: cs.enabled = not cs.enabled s.commit() # перерисовать клавиатуру с учётом чата chat = update.effective_chat bound = enabled = False if chat and chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL): cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first() if cs and cs.policy_id == p.id: bound, enabled = True, cs.enabled await q.edit_message_reply_markup(reply_markup=kb_policy(p, chat_bound=bound, enabled=enabled)) # ---------- словари: список и тоггл ---------- def _kb_dicts(policy_id: int, rows: list[tuple[int, str, bool]]): # rows: [(dict_id, "Имя (категория/kind)", is_linked)] kbd = [] for did, title, linked in rows: mark = "✅" if linked else "▫️" kbd.append([InlineKeyboardButton(f"{mark} {title}", callback_data=f"dict:toggle:{policy_id}:{did}")]) return InlineKeyboardMarkup(kbd) if kbd else None async def dicts_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE): chat = update.effective_chat user_id = update.effective_user.id # админ-проверка в группах if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP): try: m = await ctx.bot.get_chat_member(chat.id, user_id) if m.status not in ("administrator", "creator"): return except Exception: return with get_session() as s: # выбрать политику: в группе — политика чата; в ЛС — дефолт владельца if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP, ChatType.CHANNEL): cs = s.query(ChatSecurity).filter_by(chat_id=chat.id).first() if not cs: await update.effective_message.reply_text("Политика не привязана. Откройте /security и привяжите к чату.") return p = s.get(SecurityPolicy, cs.policy_id) else: p = _get_or_create_policy(s, user_id) u = s.query(User).filter_by(tg_id=user_id).first() dicts = s.query(SpamDictionary).filter_by(owner_user_id=u.id).order_by(SpamDictionary.created_at.desc()).all() linked = {x.dictionary_id for x in s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all()} rows = [] for d in dicts[:100]: title = f"{d.name} ({d.category}/{d.kind})" rows.append((d.id, title, d.id in linked)) kb = _kb_dicts(p.id, rows) if not rows: await update.effective_message.reply_text("У вас пока нет словарей. Импортируйте через /spam_import в ЛС.") return await update.effective_message.reply_text( f"Словари для политики «{p.name}» (нажмите для прикрепления/открепления):", reply_markup=kb ) async def dicts_cb(update: Update, ctx: ContextTypes.DEFAULT_TYPE): q = update.callback_query await q.answer() parts = (q.data or "").split(":") if len(parts) != 4 or parts[0] != "dict" or parts[1] != "toggle": return policy_id = int(parts[2]); dict_id = int(parts[3]) # только админ try: m = await ctx.bot.get_chat_member(update.effective_chat.id, update.effective_user.id) if m.status not in ("administrator", "creator"): await q.answer("Недостаточно прав.", show_alert=True); return except Exception: await q.answer("Недостаточно прав.", show_alert=True); return with get_session() as s: p = s.get(SecurityPolicy, policy_id) if not p: await q.edit_message_text("Политика не найдена.") return link = s.query(PolicyDictionaryLink).filter_by(policy_id=policy_id, dictionary_id=dict_id).first() if link: s.delete(link); s.commit() else: s.add(PolicyDictionaryLink(policy_id=policy_id, dictionary_id=dict_id)); s.commit() u = s.query(User).filter_by(tg_id=update.effective_user.id).first() dicts = s.query(SpamDictionary).filter_by(owner_user_id=u.id).order_by(SpamDictionary.created_at.desc()).all() linked = {x.dictionary_id for x in s.query(PolicyDictionaryLink).filter_by(policy_id=p.id).all()} rows = [] for d in dicts[:100]: title = f"{d.name} ({d.category}/{d.kind})" rows.append((d.id, title, d.id in linked)) await q.edit_message_reply_markup(reply_markup=_kb_dicts(p.id, rows)) # ---------- импорт словаря ---------- async def spam_import_cmd(update: Update, ctx: ContextTypes.DEFAULT_TYPE): # только в ЛС; в main.py уже стоит фильтр, но на всякий if update.effective_chat.type != ChatType.PRIVATE: await update.effective_message.reply_text("Эту команду нужно выполнить в личке со мной.") return ctx.user_data["await_dict_file"] = True ctx.user_data.pop("dict_params", None) await update.effective_message.reply_text( "Пришлите .txt/.csv ФАЙЛОМ — один паттерн на строку.\n" "Подпись (необязательно): name=RU_spam; category=spam|scam|adult|profanity|custom; kind=plain|regex; lang=ru" ) async def spam_import_capture(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if update.effective_chat.type != ChatType.PRIVATE or not ctx.user_data.get("await_dict_file"): return doc = update.message.document if update.message else None if not doc: return await update.effective_message.reply_text(f"Файл получен: {doc.file_name or 'без имени'} — импортирую…") try: f = await doc.get_file() bio = io.BytesIO() await f.download_to_memory(out=bio) bio.seek(0) text = _decode_bytes(bio.read()) lines = [l.strip() for l in text.splitlines() if l.strip()] if not lines: await update.effective_message.reply_text("Файл пуст. Добавьте строки с паттернами.") return params = _parse_params(update.message.caption, doc.file_name or "dict") with get_session() as s: n = _import_entries(s, update.effective_user.id, params, lines) ctx.user_data.pop("await_dict_file", None) await update.effective_message.reply_text(f"Импортировано {n} записей в словарь «{params['name']}».") except Exception as e: await update.effective_message.reply_text(f"Ошибка импорта: {e}") async def spam_import_text_capture(update: Update, ctx: ContextTypes.DEFAULT_TYPE): # перехватываем только в ЛС и только если ждём словарь — И НЕ блокируем цепочку (block=False в main.py) if update.effective_chat.type != ChatType.PRIVATE or not ctx.user_data.get("await_dict_file"): return txt = (update.effective_message.text or "").strip() if not txt: return # если текст похож на "параметры", просто запомним и попросим файл if ("=" in txt) and (";" in txt) and (len(txt.split()) <= 6): ctx.user_data["dict_params"] = txt await update.effective_message.reply_text("Параметры принял. Теперь пришлите .txt/.csv ФАЙЛОМ со словарём.") return # иначе считаем, что прислан словарь "в лоб" lines = [l.strip() for l in txt.splitlines() if l.strip()] params = _parse_params(ctx.user_data.get("dict_params"), "inline_dict") try: with get_session() as s: n = _import_entries(s, update.effective_user.id, params, lines) ctx.user_data.pop("await_dict_file", None) ctx.user_data.pop("dict_params", None) await update.effective_message.reply_text( f"Импортировано {n} записей (из текста) в словарь «{params['name']}»." ) except Exception as e: await update.effective_message.reply_text(f"Ошибка импорта: {e}")