# app/moderation/engine.py import re from typing import List, Optional from datetime import datetime, timedelta from urllib.parse import urlparse # Быстрый и стабильный хеш контента try: import xxhash def _hash_init(): return xxhash.xxh3_128() def _hash_hex(h): # noqa: ANN001 return h.hexdigest() except Exception: # fallback на hashlib import hashlib def _hash_init(): return hashlib.sha256() def _hash_hex(h): # noqa: ANN001 return h.hexdigest() # Быстрый поиск plain-терминов try: import ahocorasick # pyahocorasick except Exception: ahocorasick = None # не критично — останутся только regex-словаря from sqlalchemy import select, func from app.moderation.cache import TTLCache from app.infra.redis_client import get_redis from app.db.models import ( SecurityPolicy, ChatSecurity, Delivery, SpamDictionary, DictionaryEntry, PolicyDictionaryLink, DomainRule, ModerationLog, UserStrike, MessageEvent, ) URL_RE = re.compile(r'https?://[^\s)]+', re.IGNORECASE) MENTION_RE = re.compile(r'@\w+', re.UNICODE) # Кеши (уменьшают число запросов к БД и компиляций) policy_cache = TTLCache(ttl_seconds=60, max_size=4096) # chat_id -> snapshot(dict) dict_cache = TTLCache(ttl_seconds=60, max_size=512) # policy_id -> (ac_automaton|None, [regex...]) domain_cache = TTLCache(ttl_seconds=60, max_size=1024) # policy_id -> (whitelist_set, blacklist_set) def snapshot_policy(p: SecurityPolicy) -> dict: return { "id": p.id, "cooldown_seconds": p.cooldown_seconds, "duplicate_window_seconds": p.duplicate_window_seconds, "max_links": p.max_links, "max_mentions": p.max_mentions, "use_whitelist": p.use_whitelist, "block_adult": p.block_adult, "block_spam": p.block_spam, "block_scam": p.block_scam, "block_profanity": p.block_profanity, "enforce_action_default": p.enforce_action_default, "timeout_minutes": p.timeout_minutes, "strikes_to_warn": p.strikes_to_warn, "strikes_to_timeout": p.strikes_to_timeout, "strikes_to_ban": p.strikes_to_ban, "user_msg_per_minute": p.user_msg_per_minute, } def compute_content_hash(text: str, media_ids: List[str]) -> str: h = _hash_init() h.update(text or "") for m in media_ids or []: h.update("|") h.update(m or "") return _hash_hex(h) def _find_domains(text: str) -> list[str]: domains = [] for m in URL_RE.findall(text or ""): try: d = urlparse(m).netloc.lower() if d.startswith("www."): d = d[4:] if d: domains.append(d) except Exception: pass return domains def get_policy_for_chat(session, chat_id: int) -> Optional[SecurityPolicy]: """Возвращает активную (enabled) политику для чата или None.""" snap = policy_cache.get(chat_id) if snap: return session.get(SecurityPolicy, snap["id"]) cs = session.execute( select(ChatSecurity).where( ChatSecurity.chat_id == chat_id, ChatSecurity.enabled.is_(True), ) ).scalar_one_or_none() if not cs: return None p = session.get(SecurityPolicy, cs.policy_id) if p: policy_cache.set(chat_id, snapshot_policy(p)) return p def _active_dicts(session, policy: SecurityPolicy) -> list[SpamDictionary]: # Явно привязанные к политике словари linked = session.execute( select(SpamDictionary) .join(PolicyDictionaryLink, PolicyDictionaryLink.dictionary_id == SpamDictionary.id) .where(PolicyDictionaryLink.policy_id == policy.id) ).scalars().all() # Глобальные словари включённых категорий cats = [] if policy.block_adult: cats.append("adult") if policy.block_spam: cats.append("spam") if policy.block_scam: cats.append("scam") if policy.block_profanity: cats.append("profanity") globals_by_cat = session.execute( select(SpamDictionary).where( SpamDictionary.owner_user_id.is_(None), SpamDictionary.category.in_(cats) if cats else False # если пусто — не брать ) ).scalars().all() # unique по id got = {d.id: d for d in (linked + globals_by_cat)} return list(got.values()) def _compile_dicts(session, policy: SecurityPolicy): cached = dict_cache.get(policy.id) if cached is not None: return cached dicts = _active_dicts(session, policy) if not dicts: dict_cache.set(policy.id, (None, [])) return (None, []) ids = [d.id for d in dicts] entries = session.execute( select(DictionaryEntry).where(DictionaryEntry.dictionary_id.in_(ids)) ).scalars().all() ac = None regex_list: list[re.Pattern] = [] if entries and ahocorasick is not None: ac = ahocorasick.Automaton() plain_count = 0 kinds = {d.id: d.kind for d in dicts} for e in entries: use_regex = e.is_regex or kinds.get(e.dictionary_id) == "regex" if use_regex: try: regex_list.append(re.compile(e.pattern, re.IGNORECASE)) except re.error: continue else: if ac is not None: try: term = (e.pattern or "").lower() if term: ac.add_word(term, term) plain_count += 1 except Exception: continue if ac is not None and plain_count > 0: ac.make_automaton() else: ac = None dict_cache.set(policy.id, (ac, regex_list)) return ac, regex_list def _domain_sets(session, policy: SecurityPolicy) -> tuple[set[str], set[str]]: cached = domain_cache.get(policy.id) if cached is not None: return cached wl = session.execute( select(DomainRule).where(DomainRule.policy_id == policy.id, DomainRule.kind == "whitelist") ).scalars().all() bl = session.execute( select(DomainRule).where(DomainRule.policy_id == policy.id, DomainRule.kind == "blacklist") ).scalars().all() wl_set = {r.domain for r in wl} bl_set = {r.domain for r in bl} domain_cache.set(policy.id, (wl_set, bl_set)) return wl_set, bl_set # ========================== # Outgoing: проверка рассылки # ========================== def check_message_allowed(session, chat_id: int, owner_user_id: int, text: str, media_ids: List[str]): """ Проверка перед отправкой сообщения в конкретный чат по привязанной политике. Возвращает кортеж: (ok: bool, reasons: list[str], content_hash: str) Где reasons — причины блокировки (если ok == False), а content_hash — хеш контента (текст + список media_id) для анти-дубликатов. """ policy = get_policy_for_chat(session, chat_id) reasons: list[str] = [] content_hash = compute_content_hash(text or "", media_ids or []) # Если к чату не привязана политика — разрешаем if not policy: return True, reasons, content_hash # 1) Кулдаун между отправками в этот чат last = session.execute( select(Delivery) .where(Delivery.chat_id == chat_id, Delivery.status == "sent") .order_by(Delivery.created_at.desc()) ).scalars().first() if last: delta = datetime.utcnow() - last.created_at if delta.total_seconds() < policy.cooldown_seconds: reasons.append(f"cooldown<{policy.cooldown_seconds}s") # 2) Дубликаты за окно if policy.duplicate_window_seconds > 0: since = datetime.utcnow() - timedelta(seconds=policy.duplicate_window_seconds) dupe = session.execute( select(Delivery).where( Delivery.chat_id == chat_id, Delivery.content_hash == content_hash, Delivery.created_at >= since, Delivery.status == "sent", ) ).scalars().first() if dupe: reasons.append("duplicate") # 3) Лимиты ссылок и упоминаний links = URL_RE.findall(text or "") if policy.max_links >= 0 and len(links) > policy.max_links: reasons.append(f"links>{policy.max_links}") mentions = MENTION_RE.findall(text or "") if policy.max_mentions >= 0 and len(mentions) > policy.max_mentions: reasons.append(f"mentions>{policy.max_mentions}") # 4) Доменные правила wl, bl = _domain_sets(session, policy) domains = _find_domains(text or "") if policy.use_whitelist and wl: bad = [d for d in domains if d not in wl] if bad: reasons.append("not_whitelisted:" + ",".join(sorted(set(bad)))) else: bad = [d for d in domains if d in bl] if bad: reasons.append("blacklisted:" + ",".join(sorted(set(bad)))) # 5) Словари (plain + regex) ac, regex_list = _compile_dicts(session, policy) if ac is not None: lo = (text or "").lower() for _, _term in ac.iter(lo): reasons.append("dictionary_match") break if not reasons and regex_list: for r in regex_list: if r.search(text or ""): reasons.append("dictionary_match") break return (len(reasons) == 0), reasons, content_hash # ========================== # Ниже — helpers для модерации входящих (если используете «страж» в группе) # ========================== async def redis_rate_check(chat_id: int, user_id: int, per_minute: int) -> bool: """True — если укладывается в лимит per_minute сообщений/минуту.""" if per_minute <= 0: return True r = await get_redis() if r is None: return True key = f"rl:{chat_id}:{user_id}" pipe = r.pipeline() pipe.incr(key, 1) pipe.expire(key, 60) cnt, _ = await pipe.execute() return int(cnt) <= per_minute async def redis_dupe_check(chat_id: int, user_id: int, content_hash: str, window_s: int) -> bool: """True — если не дубликат за окно window_s (сек.), иначе False.""" if window_s <= 0: return True r = await get_redis() if r is None: return True key = f"dupe:{chat_id}:{user_id}:{content_hash}" ok = await r.set(key, "1", ex=window_s, nx=True) return ok is True def add_strike_and_decide_action(session, policy: SecurityPolicy, chat_id: int, tg_user_id: int) -> str: """ Увеличивает страйки и возвращает действие: warn|timeout|ban|delete|none (эскалация по порогам политики). """ us = session.execute( select(UserStrike).where(UserStrike.chat_id == chat_id, UserStrike.tg_user_id == tg_user_id) ).scalar_one_or_none() if not us: us = UserStrike(chat_id=chat_id, tg_user_id=tg_user_id, strikes=0) session.add(us) session.commit() session.refresh(us) us.strikes += 1 us.updated_at = datetime.utcnow() session.commit() if us.strikes >= policy.strikes_to_ban: return "ban" if us.strikes >= policy.strikes_to_timeout: return "timeout" if us.strikes >= policy.strikes_to_warn: return "warn" return policy.enforce_action_default or "delete"