Files
tg_post_min/app/moderation/engine.py
Andrey K. Choi c16ec54891 Bot become a Community Guard & Post send manager
added: dictionary support for censore
message/user management with dict triggers
2025-08-22 21:44:14 +09:00

339 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/moderation/engine.py
import re
from typing import List, Optional
from datetime import datetime, timedelta
from urllib.parse import urlparse
# Быстрый и стабильный хеш контента
try:
import xxhash
def _hash_init():
return xxhash.xxh3_128()
def _hash_hex(h): # noqa: ANN001
return h.hexdigest()
except Exception: # fallback на hashlib
import hashlib
def _hash_init():
return hashlib.sha256()
def _hash_hex(h): # noqa: ANN001
return h.hexdigest()
# Быстрый поиск plain-терминов
try:
import ahocorasick # pyahocorasick
except Exception:
ahocorasick = None # не критично — останутся только regex-словаря
from sqlalchemy import select, func
from app.moderation.cache import TTLCache
from app.infra.redis_client import get_redis
from app.db.models import (
SecurityPolicy, ChatSecurity, Delivery,
SpamDictionary, DictionaryEntry, PolicyDictionaryLink, DomainRule,
ModerationLog, UserStrike, MessageEvent,
)
URL_RE = re.compile(r'https?://[^\s)]+', re.IGNORECASE)
MENTION_RE = re.compile(r'@\w+', re.UNICODE)
# Кеши (уменьшают число запросов к БД и компиляций)
policy_cache = TTLCache(ttl_seconds=60, max_size=4096) # chat_id -> snapshot(dict)
dict_cache = TTLCache(ttl_seconds=60, max_size=512) # policy_id -> (ac_automaton|None, [regex...])
domain_cache = TTLCache(ttl_seconds=60, max_size=1024) # policy_id -> (whitelist_set, blacklist_set)
def snapshot_policy(p: SecurityPolicy) -> dict:
return {
"id": p.id,
"cooldown_seconds": p.cooldown_seconds,
"duplicate_window_seconds": p.duplicate_window_seconds,
"max_links": p.max_links,
"max_mentions": p.max_mentions,
"use_whitelist": p.use_whitelist,
"block_adult": p.block_adult,
"block_spam": p.block_spam,
"block_scam": p.block_scam,
"block_profanity": p.block_profanity,
"enforce_action_default": p.enforce_action_default,
"timeout_minutes": p.timeout_minutes,
"strikes_to_warn": p.strikes_to_warn,
"strikes_to_timeout": p.strikes_to_timeout,
"strikes_to_ban": p.strikes_to_ban,
"user_msg_per_minute": p.user_msg_per_minute,
}
def compute_content_hash(text: str, media_ids: List[str]) -> str:
h = _hash_init()
h.update(text or "")
for m in media_ids or []:
h.update("|")
h.update(m or "")
return _hash_hex(h)
def _find_domains(text: str) -> list[str]:
domains = []
for m in URL_RE.findall(text or ""):
try:
d = urlparse(m).netloc.lower()
if d.startswith("www."):
d = d[4:]
if d:
domains.append(d)
except Exception:
pass
return domains
def get_policy_for_chat(session, chat_id: int) -> Optional[SecurityPolicy]:
"""Возвращает активную (enabled) политику для чата или None."""
snap = policy_cache.get(chat_id)
if snap:
return session.get(SecurityPolicy, snap["id"])
cs = session.execute(
select(ChatSecurity).where(
ChatSecurity.chat_id == chat_id,
ChatSecurity.enabled.is_(True),
)
).scalar_one_or_none()
if not cs:
return None
p = session.get(SecurityPolicy, cs.policy_id)
if p:
policy_cache.set(chat_id, snapshot_policy(p))
return p
def _active_dicts(session, policy: SecurityPolicy) -> list[SpamDictionary]:
# Явно привязанные к политике словари
linked = session.execute(
select(SpamDictionary)
.join(PolicyDictionaryLink, PolicyDictionaryLink.dictionary_id == SpamDictionary.id)
.where(PolicyDictionaryLink.policy_id == policy.id)
).scalars().all()
# Глобальные словари включённых категорий
cats = []
if policy.block_adult: cats.append("adult")
if policy.block_spam: cats.append("spam")
if policy.block_scam: cats.append("scam")
if policy.block_profanity: cats.append("profanity")
globals_by_cat = session.execute(
select(SpamDictionary).where(
SpamDictionary.owner_user_id.is_(None),
SpamDictionary.category.in_(cats) if cats else False # если пусто — не брать
)
).scalars().all()
# unique по id
got = {d.id: d for d in (linked + globals_by_cat)}
return list(got.values())
def _compile_dicts(session, policy: SecurityPolicy):
cached = dict_cache.get(policy.id)
if cached is not None:
return cached
dicts = _active_dicts(session, policy)
if not dicts:
dict_cache.set(policy.id, (None, []))
return (None, [])
ids = [d.id for d in dicts]
entries = session.execute(
select(DictionaryEntry).where(DictionaryEntry.dictionary_id.in_(ids))
).scalars().all()
ac = None
regex_list: list[re.Pattern] = []
if entries and ahocorasick is not None:
ac = ahocorasick.Automaton()
plain_count = 0
kinds = {d.id: d.kind for d in dicts}
for e in entries:
use_regex = e.is_regex or kinds.get(e.dictionary_id) == "regex"
if use_regex:
try:
regex_list.append(re.compile(e.pattern, re.IGNORECASE))
except re.error:
continue
else:
if ac is not None:
try:
term = (e.pattern or "").lower()
if term:
ac.add_word(term, term)
plain_count += 1
except Exception:
continue
if ac is not None and plain_count > 0:
ac.make_automaton()
else:
ac = None
dict_cache.set(policy.id, (ac, regex_list))
return ac, regex_list
def _domain_sets(session, policy: SecurityPolicy) -> tuple[set[str], set[str]]:
cached = domain_cache.get(policy.id)
if cached is not None:
return cached
wl = session.execute(
select(DomainRule).where(DomainRule.policy_id == policy.id, DomainRule.kind == "whitelist")
).scalars().all()
bl = session.execute(
select(DomainRule).where(DomainRule.policy_id == policy.id, DomainRule.kind == "blacklist")
).scalars().all()
wl_set = {r.domain for r in wl}
bl_set = {r.domain for r in bl}
domain_cache.set(policy.id, (wl_set, bl_set))
return wl_set, bl_set
# ==========================
# Outgoing: проверка рассылки
# ==========================
def check_message_allowed(session, chat_id: int, owner_user_id: int, text: str, media_ids: List[str]):
"""
Проверка перед отправкой сообщения в конкретный чат по привязанной политике.
Возвращает кортеж:
(ok: bool, reasons: list[str], content_hash: str)
Где reasons — причины блокировки (если ok == False), а content_hash — хеш
контента (текст + список media_id) для анти-дубликатов.
"""
policy = get_policy_for_chat(session, chat_id)
reasons: list[str] = []
content_hash = compute_content_hash(text or "", media_ids or [])
# Если к чату не привязана политика — разрешаем
if not policy:
return True, reasons, content_hash
# 1) Кулдаун между отправками в этот чат
last = session.execute(
select(Delivery)
.where(Delivery.chat_id == chat_id, Delivery.status == "sent")
.order_by(Delivery.created_at.desc())
).scalars().first()
if last:
delta = datetime.utcnow() - last.created_at
if delta.total_seconds() < policy.cooldown_seconds:
reasons.append(f"cooldown<{policy.cooldown_seconds}s")
# 2) Дубликаты за окно
if policy.duplicate_window_seconds > 0:
since = datetime.utcnow() - timedelta(seconds=policy.duplicate_window_seconds)
dupe = session.execute(
select(Delivery).where(
Delivery.chat_id == chat_id,
Delivery.content_hash == content_hash,
Delivery.created_at >= since,
Delivery.status == "sent",
)
).scalars().first()
if dupe:
reasons.append("duplicate")
# 3) Лимиты ссылок и упоминаний
links = URL_RE.findall(text or "")
if policy.max_links >= 0 and len(links) > policy.max_links:
reasons.append(f"links>{policy.max_links}")
mentions = MENTION_RE.findall(text or "")
if policy.max_mentions >= 0 and len(mentions) > policy.max_mentions:
reasons.append(f"mentions>{policy.max_mentions}")
# 4) Доменные правила
wl, bl = _domain_sets(session, policy)
domains = _find_domains(text or "")
if policy.use_whitelist and wl:
bad = [d for d in domains if d not in wl]
if bad:
reasons.append("not_whitelisted:" + ",".join(sorted(set(bad))))
else:
bad = [d for d in domains if d in bl]
if bad:
reasons.append("blacklisted:" + ",".join(sorted(set(bad))))
# 5) Словари (plain + regex)
ac, regex_list = _compile_dicts(session, policy)
if ac is not None:
lo = (text or "").lower()
for _, _term in ac.iter(lo):
reasons.append("dictionary_match")
break
if not reasons and regex_list:
for r in regex_list:
if r.search(text or ""):
reasons.append("dictionary_match")
break
return (len(reasons) == 0), reasons, content_hash
# ==========================
# Ниже — helpers для модерации входящих (если используете «страж» в группе)
# ==========================
async def redis_rate_check(chat_id: int, user_id: int, per_minute: int) -> bool:
"""True — если укладывается в лимит per_minute сообщений/минуту."""
if per_minute <= 0:
return True
r = await get_redis()
if r is None:
return True
key = f"rl:{chat_id}:{user_id}"
pipe = r.pipeline()
pipe.incr(key, 1)
pipe.expire(key, 60)
cnt, _ = await pipe.execute()
return int(cnt) <= per_minute
async def redis_dupe_check(chat_id: int, user_id: int, content_hash: str, window_s: int) -> bool:
"""True — если не дубликат за окно window_s (сек.), иначе False."""
if window_s <= 0:
return True
r = await get_redis()
if r is None:
return True
key = f"dupe:{chat_id}:{user_id}:{content_hash}"
ok = await r.set(key, "1", ex=window_s, nx=True)
return ok is True
def add_strike_and_decide_action(session, policy: SecurityPolicy, chat_id: int, tg_user_id: int) -> str:
"""
Увеличивает страйки и возвращает действие: warn|timeout|ban|delete|none
(эскалация по порогам политики).
"""
us = session.execute(
select(UserStrike).where(UserStrike.chat_id == chat_id, UserStrike.tg_user_id == tg_user_id)
).scalar_one_or_none()
if not us:
us = UserStrike(chat_id=chat_id, tg_user_id=tg_user_id, strikes=0)
session.add(us)
session.commit()
session.refresh(us)
us.strikes += 1
us.updated_at = datetime.utcnow()
session.commit()
if us.strikes >= policy.strikes_to_ban:
return "ban"
if us.strikes >= policy.strikes_to_timeout:
return "timeout"
if us.strikes >= policy.strikes_to_warn:
return "warn"
return policy.enforce_action_default or "delete"